aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-28 22:04:42 -0800
committerchrislu <chris.lu@gmail.com>2024-01-28 22:04:42 -0800
commit0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4 (patch)
tree91e39430c0992ab89dc1f1d2f4f61fb0f7e6c4fe
parent545d5fbdf6308512cfc3833cdba8539859d496c4 (diff)
downloadseaweedfs-0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4.tar.xz
seaweedfs-0b2e5ddc7ca6e9a75c50fd2c1c2eebd899af3fc4.zip
wait 3 seconds before shutting down publish client, to wait for all messages to be received
-rw-r--r--weed/mq/broker/broker_grpc_pub.go20
-rw-r--r--weed/mq/client/pub_client/scheduler.go21
2 files changed, 29 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index e4861e9bc..17d01f620 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -7,10 +7,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
+ "io"
"math/rand"
"net"
"sync/atomic"
- "time"
)
// PUB
@@ -75,14 +75,17 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
respChan := make(chan *mq_pb.PublishMessageResponse, 128)
defer func() {
atomic.StoreInt32(&isStopping, 1)
+ respChan <- &mq_pb.PublishMessageResponse{
+ AckSequence: ackSequence,
+ }
close(respChan)
localTopicPartition.Publishers.RemovePublisher(clientName)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveTopicPartition(t, p)
}
+ glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence)
}()
go func() {
- ticker := time.NewTicker(1 * time.Second)
for {
select {
case resp := <-respChan:
@@ -93,15 +96,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
} else {
return
}
- case <-ticker.C:
- if atomic.LoadInt32(&isStopping) == 0 {
- response := &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- }
- respChan <- response
- } else {
- return
- }
case <-localTopicPartition.StopPublishersCh:
respChan <- &mq_pb.PublishMessageResponse{
AckSequence: ackSequence,
@@ -116,6 +110,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// receive a message
req, err := stream.Recv()
if err != nil {
+ if err == io.EOF {
+ break
+ }
+ glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
return err
}
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index c24ac0384..89d131580 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -158,7 +158,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
go func() {
for {
- _, err := publishClient.Recv()
+ ackResp, err := publishClient.Recv()
if err != nil {
e, ok := status.FromError(err)
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
@@ -168,9 +168,18 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
return
}
+ if ackResp.Error != "" {
+ publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
+ fmt.Printf("publish to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ return
+ }
+ if ackResp.AckSequence > 0 {
+ log.Printf("ack %d", ackResp.AckSequence)
+ }
}
}()
+ publishCounter := 0
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
@@ -179,7 +188,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
}); err != nil {
return fmt.Errorf("send publish data: %v", err)
}
+ publishCounter++
+ }
+
+ if err := publishClient.CloseSend(); err != nil {
+ return fmt.Errorf("close send: %v", err)
}
+
+ time.Sleep(3 * time.Second)
+
+ log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
+
return nil
}