diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/util/log_buffer/log_read.go | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go new file mode 100644 index 000000000..2b73a8064 --- /dev/null +++ b/weed/util/log_buffer/log_read.go @@ -0,0 +1,77 @@ +package log_buffer + +import ( + "bytes" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (logBuffer *LogBuffer) LoopProcessLogData( + startTreadTime time.Time, + waitForDataFn func() bool, + eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) { + // loop through all messages + var bytesBuf *bytes.Buffer + lastReadTime := startTreadTime + defer func() { + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + }() + + for { + + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) + // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) + if bytesBuf == nil { + if waitForDataFn() { + continue + } else { + return + } + } + + buf := bytesBuf.Bytes() + // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf)) + + batchSize := 0 + var startReadTime time.Time + + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + + // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf)) + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + pos += 4 + int(size) + continue + } + lastReadTime = time.Unix(0, logEntry.TsNs) + if startReadTime.IsZero() { + startReadTime = lastReadTime + } + + if err = eachLogDataFn(logEntry); err != nil { + return + } + + pos += 4 + int(size) + batchSize++ + } + + // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize) + } + +} |
