aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-01 16:01:26 -0700
committerchrislu <chris.lu@gmail.com>2024-04-01 16:01:26 -0700
commitf07875e8e10cc39e789931c56695a9dda8e884df (patch)
treefd6925952625d210a8c15f2a18e4f8b317249e86 /weed
parente568e742c94f905c619af73c35559895ff4e79d7 (diff)
downloadseaweedfs-f07875e8e10cc39e789931c56695a9dda8e884df.tar.xz
seaweedfs-f07875e8e10cc39e789931c56695a9dda8e884df.zip
send flush message to follower before shutting down logBuffer
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go4
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go10
-rw-r--r--weed/mq/topic/local_partition.go20
-rw-r--r--weed/util/log_buffer/log_buffer.go8
4 files changed, 41 insertions, 1 deletions
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 3e7977eba..e5488a13a 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -43,6 +43,10 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
} else if closeMessage := req.GetClose(); closeMessage != nil {
glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
break
+ } else if flushMessage := req.GetFlush(); flushMessage != nil {
+ glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
+ } else {
+ glog.Errorf("unknown message: %v", req)
}
}
return nil
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index a058d8da5..50470f879 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -41,6 +41,16 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
}
atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
+
+ b.accessLock.Lock()
+ defer b.accessLock.Unlock()
+ p := topic.FromPbPartition(partition)
+ if localPartition:=b.localTopicManager.GetLocalPartition(t, p); localPartition!=nil {
+ localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
+ }
+
+ println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf))
+
}
}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 895faf596..6e429e5df 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -190,6 +190,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 {
+ p.LogBuffer.ShutdownLogBuffer()
+ for !p.LogBuffer.IsAllFlushed() {
+ time.Sleep(113 * time.Millisecond)
+ }
if p.followerStream != nil {
// send close to the follower
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
@@ -205,7 +209,6 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
p.follower = ""
}
- p.LogBuffer.ShutdownLogBuffer()
hasShutdown = true
}
@@ -219,3 +222,18 @@ func (p *LocalPartition) Shutdown() {
p.LogBuffer.ShutdownLogBuffer()
glog.V(0).Infof("local partition %v shutting down", p.Partition)
}
+
+func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
+ if p.followerStream != nil {
+ if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Flush{
+ Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
+ TsNs: flushTsNs,
+ },
+ },
+ }); followErr != nil {
+ glog.Errorf("send follower %s flush message: %v", p.follower, followErr)
+ }
+ println("notifying", p.follower, "flushed at", flushTsNs)
+ }
+}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index fa956317e..65d20a757 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -43,6 +43,7 @@ type LogBuffer struct {
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
isStopping *atomic.Bool
+ isAllFlushed bool
flushChan chan *dataToFlush
LastTsNs int64
sync.RWMutex
@@ -134,6 +135,7 @@ func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
+// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)
if isAlreadyStopped {
@@ -144,6 +146,11 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() {
close(logBuffer.flushChan)
}
+// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
+func (logBuffer *LogBuffer) IsAllFlushed() bool {
+ return logBuffer.isAllFlushed
+}
+
func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
@@ -154,6 +161,7 @@ func (logBuffer *LogBuffer) loopFlush() {
logBuffer.lastFlushDataTime = d.stopTime
}
}
+ logBuffer.isAllFlushed = true
}
func (logBuffer *LogBuffer) loopInterval() {