aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/util/log_buffer/log_read.go
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
-rw-r--r--weed/util/log_buffer/log_read.go89
1 files changed, 89 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
new file mode 100644
index 000000000..d6917abfe
--- /dev/null
+++ b/weed/util/log_buffer/log_read.go
@@ -0,0 +1,89 @@
+package log_buffer
+
+import (
+ "bytes"
+ "fmt"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ ResumeError = fmt.Errorf("resume")
+ ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
+)
+
+func (logBuffer *LogBuffer) LoopProcessLogData(
+ startTreadTime time.Time,
+ waitForDataFn func() bool,
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) {
+ // loop through all messages
+ var bytesBuf *bytes.Buffer
+ lastReadTime = startTreadTime
+ defer func() {
+ if bytesBuf != nil {
+ logBuffer.ReleaseMemory(bytesBuf)
+ }
+ }()
+
+ for {
+
+ if bytesBuf != nil {
+ logBuffer.ReleaseMemory(bytesBuf)
+ }
+ bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
+ if err == ResumeFromDiskError {
+ return lastReadTime, ResumeFromDiskError
+ }
+ // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
+ if bytesBuf == nil {
+ if waitForDataFn() {
+ continue
+ } else {
+ return
+ }
+ }
+
+ buf := bytesBuf.Bytes()
+ // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf))
+
+ batchSize := 0
+ var startReadTime time.Time
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = ResumeError
+ glog.Errorf("LoopProcessLogData: read buffer %v read %d [%d,%d) from [0,%d)", lastReadTime, batchSize, pos, pos+int(size)+4, len(buf))
+ return
+ }
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+ lastReadTime = time.Unix(0, logEntry.TsNs)
+ if startReadTime.IsZero() {
+ startReadTime = lastReadTime
+ }
+
+ if err = eachLogDataFn(logEntry); err != nil {
+ return
+ }
+
+ pos += 4 + int(size)
+ batchSize++
+ }
+
+ // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize)
+ }
+
+}