aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-01 16:01:26 -0700
committerchrislu <chris.lu@gmail.com>2024-04-01 16:01:26 -0700
commitf07875e8e10cc39e789931c56695a9dda8e884df (patch)
treefd6925952625d210a8c15f2a18e4f8b317249e86 /weed/util
parente568e742c94f905c619af73c35559895ff4e79d7 (diff)
downloadseaweedfs-f07875e8e10cc39e789931c56695a9dda8e884df.tar.xz
seaweedfs-f07875e8e10cc39e789931c56695a9dda8e884df.zip
send flush message to follower before shutting down logBuffer
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/log_buffer/log_buffer.go8
1 files changed, 8 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index fa956317e..65d20a757 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -43,6 +43,7 @@ type LogBuffer struct {
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
isStopping *atomic.Bool
+ isAllFlushed bool
flushChan chan *dataToFlush
LastTsNs int64
sync.RWMutex
@@ -134,6 +135,7 @@ func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
+// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)
if isAlreadyStopped {
@@ -144,6 +146,11 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() {
close(logBuffer.flushChan)
}
+// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
+func (logBuffer *LogBuffer) IsAllFlushed() bool {
+ return logBuffer.isAllFlushed
+}
+
func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
@@ -154,6 +161,7 @@ func (logBuffer *LogBuffer) loopFlush() {
logBuffer.lastFlushDataTime = d.stopTime
}
}
+ logBuffer.isAllFlushed = true
}
func (logBuffer *LogBuffer) loopInterval() {