diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-28 22:04:42 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-28 22:04:42 -0800 |
| commit | 0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4 (patch) | |
| tree | 91e39430c0992ab89dc1f1d2f4f61fb0f7e6c4fe /weed/mq/client | |
| parent | 545d5fbdf6308512cfc3833cdba8539859d496c4 (diff) | |
| download | seaweedfs-0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4.tar.xz seaweedfs-0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4.zip | |
wait 3 seconds before shutting down publish client, to wait for all messages to be received
Diffstat (limited to 'weed/mq/client')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index c24ac0384..89d131580 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -158,7 +158,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro go func() { for { - _, err := publishClient.Recv() + ackResp, err := publishClient.Recv() if err != nil { e, ok := status.FromError(err) if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { @@ -168,9 +168,18 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) return } + if ackResp.Error != "" { + publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) + fmt.Printf("publish to %s error: %v\n", publishClient.Broker, ackResp.Error) + return + } + if ackResp.AckSequence > 0 { + log.Printf("ack %d", ackResp.AckSequence) + } } }() + publishCounter := 0 for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { if err := publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Data{ @@ -179,7 +188,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro }); err != nil { return fmt.Errorf("send publish data: %v", err) } + publishCounter++ + } + + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) } + + time.Sleep(3 * time.Second) + + log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) + return nil } |
