aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
-rw-r--r--weed/util/log_buffer/log_read.go19
1 files changed, 12 insertions, 7 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 5a8e47070..99532b47b 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -17,10 +17,11 @@ var (
ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
)
-func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) {
+func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64,
+ waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
- lastReadTime = startTreadTime
+ lastReadTime = startReadTime
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
@@ -35,10 +36,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
if err == ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond)
- return lastReadTime, ResumeFromDiskError
+ return lastReadTime, isDone, ResumeFromDiskError
}
// glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime)
if bytesBuf == nil {
+ if stopTsNs != 0 {
+ isDone = true
+ return
+ }
if waitForDataFn() {
continue
} else {
@@ -50,7 +55,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
// fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf))
batchSize := 0
- var startReadTime time.Time
for pos := 0; pos+4 < len(buf); {
@@ -68,10 +72,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
pos += 4 + int(size)
continue
}
- lastReadTime = time.Unix(0, logEntry.TsNs)
- if startReadTime.IsZero() {
- startReadTime = lastReadTime
+ if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ isDone = true
+ return
}
+ lastReadTime = time.Unix(0, logEntry.TsNs)
if err = eachLogDataFn(logEntry); err != nil {
return