diff options
| author | chrislu <chris.lu@gmail.com> | 2024-04-01 16:01:26 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-04-01 16:01:26 -0700 |
| commit | f07875e8e10cc39e789931c56695a9dda8e884df (patch) | |
| tree | fd6925952625d210a8c15f2a18e4f8b317249e86 /weed/mq/topic/local_partition.go | |
| parent | e568e742c94f905c619af73c35559895ff4e79d7 (diff) | |
| download | seaweedfs-f07875e8e10cc39e789931c56695a9dda8e884df.tar.xz seaweedfs-f07875e8e10cc39e789931c56695a9dda8e884df.zip | |
send flush message to follower before shutting down logBuffer
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 895faf596..6e429e5df 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -190,6 +190,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 { + p.LogBuffer.ShutdownLogBuffer() + for !p.LogBuffer.IsAllFlushed() { + time.Sleep(113 * time.Millisecond) + } if p.followerStream != nil { // send close to the follower if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ @@ -205,7 +209,6 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { p.follower = "" } - p.LogBuffer.ShutdownLogBuffer() hasShutdown = true } @@ -219,3 +222,18 @@ func (p *LocalPartition) Shutdown() { p.LogBuffer.ShutdownLogBuffer() glog.V(0).Infof("local partition %v shutting down", p.Partition) } + +func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { + if p.followerStream != nil { + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Flush{ + Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ + TsNs: flushTsNs, + }, + }, + }); followErr != nil { + glog.Errorf("send follower %s flush message: %v", p.follower, followErr) + } + println("notifying", p.follower, "flushed at", flushTsNs) + } +} |
