aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-07 10:50:09 -0800
committerchrislu <chris.lu@gmail.com>2024-03-07 10:50:09 -0800
commit34f2b600ac5d4850e5f4f6d9d0ce0273150542cf (patch)
tree7c7592ea7415a85b0afba5ca00172156af041ca1 /weed/util/log_buffer/log_read.go
parent62397f23715062b6e8e710568dc8f88b0bab50d8 (diff)
downloadseaweedfs-34f2b600ac5d4850e5f4f6d9d0ce0273150542cf.tar.xz
seaweedfs-34f2b600ac5d4850e5f4f6d9d0ce0273150542cf.zip
each log function adds a "done" return parameter
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
-rw-r--r--weed/util/log_buffer/log_read.go8
1 files changed, 6 insertions, 2 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 8a4d2d851..5529a6691 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -30,7 +30,7 @@ func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition {
}
func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64,
- waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadPosition MessagePosition, isDone bool, err error) {
+ waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
var batchIndex int64
@@ -69,6 +69,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if waitForDataFn() {
continue
} else {
+ isDone = true
return
}
}
@@ -101,10 +102,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
}
lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
- if err = eachLogDataFn(logEntry); err != nil {
+ if isDone, err = eachLogDataFn(logEntry); err != nil {
glog.Errorf("LoopProcessLogData: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
return
}
+ if isDone {
+ return
+ }
pos += 4 + int(size)
batchSize++