aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/topic/local_partition.go36
1 files changed, 18 insertions, 18 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 8b9970f20..a9f861710 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -182,26 +182,26 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
}
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
- if !p.Publishers.IsEmpty() {
- return
- }
- if !p.Subscribers.IsEmpty() {
- return
- }
- p.LogBuffer.ShutdownLogBuffer()
- if p.followerStream != nil {
- // send close to the follower
- if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
- Message: &mq_pb.PublishFollowMeRequest_Close{
- Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
- },
- }); followErr != nil {
- glog.Errorf("Error closing follower stream: %v", followErr)
+ if p.Publishers.IsEmpty() {
+ if p.followerStream != nil {
+ // send close to the follower
+ if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Close{
+ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
+ },
+ }); followErr != nil {
+ glog.Errorf("Error closing follower stream: %v", followErr)
+ }
+ glog.V(4).Infof("closing grpcConnection to follower")
+ p.followerGrpcConnection.Close()
+ p.followerStream = nil
}
- glog.V(4).Infof("closing grpcConnection to follower")
- p.followerGrpcConnection.Close()
- p.followerStream = nil
+ }
+
+ if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() {
+ p.LogBuffer.ShutdownLogBuffer()
+ hasShutdown = true
}
return