aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go38
1 files changed, 19 insertions, 19 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 53324d846..9bedc5a15 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -179,28 +179,28 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
}
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
- if p.canShutdownLocalPartition() {
- if p.FollowerStream != nil {
- // send close to the follower
- if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
- Message: &mq_pb.PublishFollowMeRequest_Close{
- Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
- },
- }); followErr != nil {
- glog.Errorf("Error closing follower stream: %v", followErr)
- }
- println("closing grpcConnection to follower")
- p.FollowerGrpcConnection.Close()
- }
+ if !p.Publishers.IsEmpty() {
+ return
}
- return
-}
+ if !p.Subscribers.IsEmpty() {
+ return
+ }
+ p.LogBuffer.ShutdownLogBuffer()
-func (p *LocalPartition) canShutdownLocalPartition() (hasShutdown bool) {
- if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() {
- p.LogBuffer.ShutdownLogBuffer()
- hasShutdown = true
+ if p.FollowerStream != nil {
+ // send close to the follower
+ if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Close{
+ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
+ },
+ }); followErr != nil {
+ glog.Errorf("Error closing follower stream: %v", followErr)
+ }
+ glog.V(4).Infof("closing grpcConnection to follower")
+ p.FollowerGrpcConnection.Close()
+ p.FollowerStream = nil
}
+
return
}