diff options
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 35 |
1 files changed, 27 insertions, 8 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e6caf896c..e92e07ab5 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -12,6 +12,7 @@ import ( "log" "sort" "sync" + "sync/atomic" "time" ) @@ -145,11 +146,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro Partition: job.Partition, AckInterval: 128, FollowerBrokers: job.FollowerBrokers, + PublisherName: p.config.PublisherName, }, }, }); err != nil { return fmt.Errorf("send init message: %v", err) } + // process the hello message resp, err := stream.Recv() if err != nil { return fmt.Errorf("recv init response: %v", err) @@ -158,31 +161,44 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("init response error: %v", resp.Error) } + var publishedTsNs int64 + hasMoreData := int32(1) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for { ackResp, err := publishClient.Recv() if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { + log.Printf("publish to %s EOF", publishClient.Broker) return } publishClient.Err = err - fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) - fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { - log.Printf("ack %d", ackResp.AckSequence) + log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + } + if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + return } } }() publishCounter := 0 for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { + if data.Ctrl != nil && data.Ctrl.IsClose { + // need to set this before sending to brokers, to avoid timing issue + atomic.StoreInt32(&hasMoreData, 0) + } if err := publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Data{ Data: data, @@ -191,14 +207,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("send publish data: %v", err) } publishCounter++ + atomic.StoreInt64(&publishedTsNs, data.TsNs) } - - if err := publishClient.CloseSend(); err != nil { - return fmt.Errorf("close send: %v", err) + if publishCounter > 0 { + wg.Wait() + } else { + // CloseSend would cancel the context on the server side + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) + } } - time.Sleep(3 * time.Second) - log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) return nil |
