aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/pub_client/scheduler.go23
1 files changed, 16 insertions, 7 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index 80a1ac9ef..e91127522 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -12,6 +12,7 @@ import (
"log"
"sort"
"sync"
+ "sync/atomic"
"time"
)
@@ -159,6 +160,8 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("init response error: %v", resp.Error)
}
+ var publishedTsNs int64
+ hasMoreData := int32(1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
@@ -183,6 +186,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
if ackResp.AckSequence > 0 {
log.Printf("ack %d", ackResp.AckSequence)
}
+ if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ return
+ }
}
}()
@@ -196,14 +202,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("send publish data: %v", err)
}
publishCounter++
+ atomic.StoreInt64(&publishedTsNs, data.TsNs)
+ }
+ atomic.StoreInt32(&hasMoreData, 0)
+ if publishCounter > 0 {
+ wg.Wait()
+ } else {
+ // CloseSend would cancel the context on the server side
+ if err := publishClient.CloseSend(); err != nil {
+ return fmt.Errorf("close send: %v", err)
+ }
}
-
- // CloseSend would cancel the context on the server side
- //if err := publishClient.CloseSend(); err != nil {
- // return fmt.Errorf("close send: %v", err)
- //}
-
- wg.Wait()
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)