diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-21 09:08:53 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-21 09:08:53 -0700 |
| commit | da31e9b939ed38fdd7c053eeaa9a7da06c5c936f (patch) | |
| tree | 9ad7bb1833ce6695dded8a1538abf5da10e14bd6 /weed | |
| parent | e641d49f9f995b5648303e8d005d401e7c75c6d2 (diff) | |
| download | seaweedfs-da31e9b939ed38fdd7c053eeaa9a7da06c5c936f.tar.xz seaweedfs-da31e9b939ed38fdd7c053eeaa9a7da06c5c936f.zip | |
correctly wait for the publisher to finish
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 80a1ac9ef..e91127522 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -12,6 +12,7 @@ import ( "log" "sort" "sync" + "sync/atomic" "time" ) @@ -159,6 +160,8 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("init response error: %v", resp.Error) } + var publishedTsNs int64 + hasMoreData := int32(1) var wg sync.WaitGroup wg.Add(1) go func() { @@ -183,6 +186,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if ackResp.AckSequence > 0 { log.Printf("ack %d", ackResp.AckSequence) } + if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + return + } } }() @@ -196,14 +202,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("send publish data: %v", err) } publishCounter++ + atomic.StoreInt64(&publishedTsNs, data.TsNs) + } + atomic.StoreInt32(&hasMoreData, 0) + if publishCounter > 0 { + wg.Wait() + } else { + // CloseSend would cancel the context on the server side + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) + } } - - // CloseSend would cancel the context on the server side - //if err := publishClient.CloseSend(); err != nil { - // return fmt.Errorf("close send: %v", err) - //} - - wg.Wait() log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) |
