aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-27 22:48:37 -0700
committerchrislu <chris.lu@gmail.com>2024-03-27 22:48:37 -0700
commit7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b (patch)
tree03aad1a456c3cdb8789e88dc5dab079379caaf13
parent5cc94a05b93c5f13d02bd091ff450f9e8274c5ea (diff)
downloadseaweedfs-7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b.tar.xz
seaweedfs-7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b.zip
refactor
-rw-r--r--weed/mq/broker/broker_grpc_pub.go12
-rw-r--r--weed/mq/topic/local_partition.go18
2 files changed, 18 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index f8554ea5b..1253420ac 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -153,18 +153,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition.Publishers.RemovePublisher(clientName)
glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
if localTopicPartition.MaybeShutdownLocalPartition() {
- if localTopicPartition.FollowerStream != nil {
- // send close to the follower
- if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
- Message: &mq_pb.PublishFollowMeRequest_Close{
- Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
- },
- }); followErr != nil {
- glog.Errorf("Error closing follower stream: %v", followErr)
- }
- println("closing grpcConnection to follower")
- localTopicPartition.FollowerGrpcConnection.Close()
- }
b.localTopicManager.RemoveTopicPartition(t, p)
glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 34c2903f4..a6562fd5c 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -129,6 +129,24 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
}
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
+ if p.MaybeShutdownLocalPartition() {
+ if p.FollowerStream != nil {
+ // send close to the follower
+ if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Close{
+ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
+ },
+ }); followErr != nil {
+ glog.Errorf("Error closing follower stream: %v", followErr)
+ }
+ println("closing grpcConnection to follower")
+ p.FollowerGrpcConnection.Close()
+ }
+ }
+ return
+}
+
+func (p *LocalPartition) canShutdownLocalPartition() (hasShutdown bool) {
if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() {
p.LogBuffer.ShutdownLogBuffer()
hasShutdown = true