aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
-rw-r--r--weed/mq/client/pub_client/scheduler.go35
1 files changed, 27 insertions, 8 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index e6caf896c..e92e07ab5 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -12,6 +12,7 @@ import (
"log"
"sort"
"sync"
+ "sync/atomic"
"time"
)
@@ -145,11 +146,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
Partition: job.Partition,
AckInterval: 128,
FollowerBrokers: job.FollowerBrokers,
+ PublisherName: p.config.PublisherName,
},
},
}); err != nil {
return fmt.Errorf("send init message: %v", err)
}
+ // process the hello message
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("recv init response: %v", err)
@@ -158,31 +161,44 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("init response error: %v", resp.Error)
}
+ var publishedTsNs int64
+ hasMoreData := int32(1)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
ackResp, err := publishClient.Recv()
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Unknown && e.Message() == "EOF" {
+ log.Printf("publish to %s EOF", publishClient.Broker)
return
}
publishClient.Err = err
- fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
+ log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
return
}
if ackResp.Error != "" {
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
return
}
if ackResp.AckSequence > 0 {
- log.Printf("ack %d", ackResp.AckSequence)
+ log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ }
+ if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ return
}
}
}()
publishCounter := 0
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
+ if data.Ctrl != nil && data.Ctrl.IsClose {
+ // need to set this before sending to brokers, to avoid timing issue
+ atomic.StoreInt32(&hasMoreData, 0)
+ }
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: data,
@@ -191,14 +207,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("send publish data: %v", err)
}
publishCounter++
+ atomic.StoreInt64(&publishedTsNs, data.TsNs)
}
-
- if err := publishClient.CloseSend(); err != nil {
- return fmt.Errorf("close send: %v", err)
+ if publishCounter > 0 {
+ wg.Wait()
+ } else {
+ // CloseSend would cancel the context on the server side
+ if err := publishClient.CloseSend(); err != nil {
+ return fmt.Errorf("close send: %v", err)
+ }
}
- time.Sleep(3 * time.Second)
-
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
return nil