diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-09-09 11:21:23 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-09-09 11:21:23 -0700 |
| commit | 387ab6796f274151f802ccdab8756b959b5fb1cb (patch) | |
| tree | a3b95f5bdba66f12c609b5e53b262b011a47a450 /weed/util/log_buffer/log_read.go | |
| parent | 4fc0bd1a8173e284ff919edb5214f5adf7a90f06 (diff) | |
| download | seaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.tar.xz seaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.zip | |
filer: cross cluster synchronization
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index f0486ac46..57f4b0115 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -2,6 +2,7 @@ package log_buffer import ( "bytes" + "fmt" "time" "github.com/golang/protobuf/proto" @@ -11,13 +12,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +var ( + ResumeError = fmt.Errorf("resume") +) + func (logBuffer *LogBuffer) LoopProcessLogData( startTreadTime time.Time, waitForDataFn func() bool, - eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) { + eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime := startTreadTime + lastReadTime = startTreadTime defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -48,10 +53,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData( for pos := 0; pos+4 < len(buf); { size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + err = ResumeError + glog.Errorf("LoopProcessLogData: read buffer %v read %d [%d,%d) from [0,%d)", lastReadTime, batchSize, pos, pos+int(size)+4, len(buf)) + return + } entryData := buf[pos+4 : pos+4+int(size)] - // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf)) - logEntry := &filer_pb.LogEntry{} if err = proto.Unmarshal(entryData, logEntry); err != nil { glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) |
