aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-28 22:04:42 -0800
committerchrislu <chris.lu@gmail.com>2024-01-28 22:04:42 -0800
commit0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4 (patch)
tree91e39430c0992ab89dc1f1d2f4f61fb0f7e6c4fe /weed/mq/client
parent545d5fbdf6308512cfc3833cdba8539859d496c4 (diff)
downloadseaweedfs-0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4.tar.xz
seaweedfs-0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4.zip
wait 3 seconds before shutting down publish client, to wait for all messages to be received
Diffstat (limited to 'weed/mq/client')
-rw-r--r--weed/mq/client/pub_client/scheduler.go21
1 files changed, 20 insertions, 1 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index c24ac0384..89d131580 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -158,7 +158,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
go func() {
for {
- _, err := publishClient.Recv()
+ ackResp, err := publishClient.Recv()
if err != nil {
e, ok := status.FromError(err)
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
@@ -168,9 +168,18 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
return
}
+ if ackResp.Error != "" {
+ publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
+ fmt.Printf("publish to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ return
+ }
+ if ackResp.AckSequence > 0 {
+ log.Printf("ack %d", ackResp.AckSequence)
+ }
}
}()
+ publishCounter := 0
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
@@ -179,7 +188,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
}); err != nil {
return fmt.Errorf("send publish data: %v", err)
}
+ publishCounter++
+ }
+
+ if err := publishClient.CloseSend(); err != nil {
+ return fmt.Errorf("close send: %v", err)
}
+
+ time.Sleep(3 * time.Second)
+
+ log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
+
return nil
}