diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2022-05-30 21:38:31 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-05-30 21:38:31 -0700 |
| commit | ca01ce05249c336ed380d9f77efbee68213b8a37 (patch) | |
| tree | 2241d21d85b966d4957bae06ea09cdc4ad6bb534 /weed/util/log_buffer/log_read.go | |
| parent | 8902fa6ff653aa40249d2d6da49a9227b63415bb (diff) | |
| parent | 6adc42147f972d2adc223e119c0f97094b3f0bec (diff) | |
| download | seaweedfs-ca01ce05249c336ed380d9f77efbee68213b8a37.tar.xz seaweedfs-ca01ce05249c336ed380d9f77efbee68213b8a37.zip | |
Merge pull request #3122 from chrislusf/filer-sync-with-peers
Filer bootstrap from peers
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 5a8e47070..99532b47b 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -17,10 +17,11 @@ var ( ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) -func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { +func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime = startTreadTime + lastReadTime = startReadTime defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -35,10 +36,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) if err == ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) - return lastReadTime, ResumeFromDiskError + return lastReadTime, isDone, ResumeFromDiskError } // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) if bytesBuf == nil { + if stopTsNs != 0 { + isDone = true + return + } if waitForDataFn() { continue } else { @@ -50,7 +55,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) batchSize := 0 - var startReadTime time.Time for pos := 0; pos+4 < len(buf); { @@ -68,10 +72,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime pos += 4 + int(size) continue } - lastReadTime = time.Unix(0, logEntry.TsNs) - if startReadTime.IsZero() { - startReadTime = lastReadTime + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + isDone = true + return } + lastReadTime = time.Unix(0, logEntry.TsNs) if err = eachLogDataFn(logEntry); err != nil { return |
