diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-27 22:48:37 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-27 22:48:37 -0700 |
| commit | 7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b (patch) | |
| tree | 03aad1a456c3cdb8789e88dc5dab079379caaf13 | |
| parent | 5cc94a05b93c5f13d02bd091ff450f9e8274c5ea (diff) | |
| download | seaweedfs-7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b.tar.xz seaweedfs-7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b.zip | |
refactor
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 12 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 18 |
2 files changed, 18 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index f8554ea5b..1253420ac 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -153,18 +153,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition.Publishers.RemovePublisher(clientName) glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) if localTopicPartition.MaybeShutdownLocalPartition() { - if localTopicPartition.FollowerStream != nil { - // send close to the follower - if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ - Message: &mq_pb.PublishFollowMeRequest_Close{ - Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, - }, - }); followErr != nil { - glog.Errorf("Error closing follower stream: %v", followErr) - } - println("closing grpcConnection to follower") - localTopicPartition.FollowerGrpcConnection.Close() - } b.localTopicManager.RemoveTopicPartition(t, p) glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 34c2903f4..a6562fd5c 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -129,6 +129,24 @@ func (p *LocalPartition) WaitUntilNoPublishers() { } func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { + if p.MaybeShutdownLocalPartition() { + if p.FollowerStream != nil { + // send close to the follower + if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + glog.Errorf("Error closing follower stream: %v", followErr) + } + println("closing grpcConnection to follower") + p.FollowerGrpcConnection.Close() + } + } + return +} + +func (p *LocalPartition) canShutdownLocalPartition() (hasShutdown bool) { if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { p.LogBuffer.ShutdownLogBuffer() hasShutdown = true |
