diff options
| author | chrislu <chris.lu@gmail.com> | 2022-05-30 22:47:29 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-05-30 22:47:29 -0700 |
| commit | f4a6da6cb276f1891b01097670b044fd4ee6139d (patch) | |
| tree | 8b92c13091e1d9973ce21e4f0c9ac143bd667cfd /weed/util/log_buffer/log_read.go | |
| parent | 596c3860cac83a75ae9ce728c8a043133c03d098 (diff) | |
| parent | ca01ce05249c336ed380d9f77efbee68213b8a37 (diff) | |
| download | seaweedfs-f4a6da6cb276f1891b01097670b044fd4ee6139d.tar.xz seaweedfs-f4a6da6cb276f1891b01097670b044fd4ee6139d.zip | |
Merge branch 'master' of https://github.com/chrislusf/seaweedfs
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 5a8e47070..99532b47b 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -17,10 +17,11 @@ var ( ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) -func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { +func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime = startTreadTime + lastReadTime = startReadTime defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -35,10 +36,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) if err == ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) - return lastReadTime, ResumeFromDiskError + return lastReadTime, isDone, ResumeFromDiskError } // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) if bytesBuf == nil { + if stopTsNs != 0 { + isDone = true + return + } if waitForDataFn() { continue } else { @@ -50,7 +55,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) batchSize := 0 - var startReadTime time.Time for pos := 0; pos+4 < len(buf); { @@ -68,10 +72,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime pos += 4 + int(size) continue } - lastReadTime = time.Unix(0, logEntry.TsNs) - if startReadTime.IsZero() { - startReadTime = lastReadTime + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + isDone = true + return } + lastReadTime = time.Unix(0, logEntry.TsNs) if err = eachLogDataFn(logEntry); err != nil { return |
