diff options
Diffstat (limited to 'weed/mq/client')
| -rw-r--r-- | weed/mq/client/cmd/weed_pub/publisher.go | 13 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publish.go | 17 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publisher.go | 1 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 35 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 21 |
5 files changed, 66 insertions, 21 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 2873ba21f..e9227130a 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -16,6 +16,8 @@ var ( concurrency = flag.Int("c", 4, "concurrent publishers") partitionCount = flag.Int("p", 6, "partition count") + clientName = flag.String("client", "c1", "client name") + namespace = flag.String("ns", "test", "namespace") t = flag.String("t", "test", "t") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") @@ -25,16 +27,20 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { startTime := time.Now() for i := 0; i < *messageCount / *concurrency; i++ { // Simulate publishing a message - key := []byte(fmt.Sprintf("key-%d-%d", id, i)) - value := []byte(fmt.Sprintf("value-%d-%d", id, i)) + key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i)) + value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i)) if err := publisher.Publish(key, value); err != nil { fmt.Println(err) break } + time.Sleep(time.Second) // println("Published", string(key), string(value)) } + if err := publisher.FinishPublish(); err != nil { + fmt.Println(err) + } elapsed := time.Since(startTime) - log.Printf("Publisher %d finished in %s", id, elapsed) + log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) } func main() { @@ -44,6 +50,7 @@ func main() { CreateTopic: true, CreateTopicPartitionCount: int32(*partitionCount), Brokers: strings.Split(*seedBrokers, ","), + PublisherName: *clientName, } publisher := pub_client.NewTopicPublisher(config) diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 1c5891049..fbb07b042 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" + "time" ) func (p *TopicPublisher) Publish(key, value []byte) error { @@ -20,5 +21,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error { return inputBuffer.Enqueue(&mq_pb.DataMessage{ Key: key, Value: value, + TsNs: time.Now().UnixNano(), }) } + +func (p *TopicPublisher) FinishPublish() error { + if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { + for _, inputBuffer := range inputBuffers { + inputBuffer.Enqueue(&mq_pb.DataMessage{ + TsNs: time.Now().UnixNano(), + Ctrl: &mq_pb.ControlMessage{ + IsClose: true, + }, + }) + } + } + + return nil +} diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 9262d6e0c..09984bae3 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -17,6 +17,7 @@ type PublisherConfiguration struct { CreateTopic bool CreateTopicPartitionCount int32 Brokers []string + PublisherName string // for debugging } type PublishClient struct { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e6caf896c..e92e07ab5 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -12,6 +12,7 @@ import ( "log" "sort" "sync" + "sync/atomic" "time" ) @@ -145,11 +146,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro Partition: job.Partition, AckInterval: 128, FollowerBrokers: job.FollowerBrokers, + PublisherName: p.config.PublisherName, }, }, }); err != nil { return fmt.Errorf("send init message: %v", err) } + // process the hello message resp, err := stream.Recv() if err != nil { return fmt.Errorf("recv init response: %v", err) @@ -158,31 +161,44 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("init response error: %v", resp.Error) } + var publishedTsNs int64 + hasMoreData := int32(1) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for { ackResp, err := publishClient.Recv() if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { + log.Printf("publish to %s EOF", publishClient.Broker) return } publishClient.Err = err - fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) - fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { - log.Printf("ack %d", ackResp.AckSequence) + log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + } + if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + return } } }() publishCounter := 0 for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { + if data.Ctrl != nil && data.Ctrl.IsClose { + // need to set this before sending to brokers, to avoid timing issue + atomic.StoreInt32(&hasMoreData, 0) + } if err := publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Data{ Data: data, @@ -191,14 +207,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("send publish data: %v", err) } publishCounter++ + atomic.StoreInt64(&publishedTsNs, data.TsNs) } - - if err := publishClient.CloseSend(); err != nil { - return fmt.Errorf("close send: %v", err) + if publishCounter > 0 { + wg.Wait() + } else { + // CloseSend would cancel the context on the server side + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) + } } - time.Sleep(3 * time.Second) - log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) return nil diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 4cc3c8ff2..b0b533e42 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -91,26 +91,26 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo var wg sync.WaitGroup semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit) - for _, assigned := range assignment.AssignedPartitions { + for _, assigned := range assignment.PartitionAssignments { wg.Add(1) semaphore <- struct{}{} - go func(partition *mq_pb.Partition, broker string) { + go func(assigned *mq_pb.BrokerPartitionAssignment) { defer wg.Done() defer func() { <-semaphore }() - glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) - err := sub.onEachPartition(partition, broker) + glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + err := sub.onEachPartition(assigned) if err != nil { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err) + glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) } - }(assigned.Partition, assigned.Broker) + }(assigned) } wg.Wait() } -func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error { +func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { // connect to the partition broker - return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ @@ -118,11 +118,12 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: sub.ContentConfig.Topic.ToPbTopic(), PartitionOffset: &mq_pb.PartitionOffset{ - Partition: partition, + Partition: assigned.Partition, StartTsNs: sub.alreadyProcessedTsNs, StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, }, Filter: sub.ContentConfig.Filter, + FollowerBrokers: assigned.FollowerBrokers, }, }, }) @@ -131,7 +132,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s return fmt.Errorf("create subscribe client: %v", err) } - glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) + glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) if sub.OnCompletionFunc != nil { defer sub.OnCompletionFunc() |
