aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
-rw-r--r--weed/util/log_buffer/log_read.go102
1 files changed, 102 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index cf83de1e5..0ebcc7cc9 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -130,3 +130,105 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
}
}
+
+// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback
+func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64,
+ waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+ // loop through all messages
+ var bytesBuf *bytes.Buffer
+ var batchIndex int64
+ lastReadPosition = startPosition
+ var entryCounter int64
+ defer func() {
+ if bytesBuf != nil {
+ logBuffer.ReleaseMemory(bytesBuf)
+ }
+ // println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter)
+ }()
+
+ for {
+
+ if bytesBuf != nil {
+ logBuffer.ReleaseMemory(bytesBuf)
+ }
+ bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition)
+ if err == ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ return lastReadPosition, isDone, ResumeFromDiskError
+ }
+ readSize := 0
+ if bytesBuf != nil {
+ readSize = bytesBuf.Len()
+ }
+ glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
+ if bytesBuf == nil {
+ if batchIndex >= 0 {
+ lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
+ }
+ if stopTsNs != 0 {
+ isDone = true
+ return
+ }
+ lastTsNs := logBuffer.LastTsNs.Load()
+
+ for lastTsNs == logBuffer.LastTsNs.Load() {
+ if waitForDataFn() {
+ continue
+ } else {
+ isDone = true
+ return
+ }
+ }
+ if logBuffer.IsStopping() {
+ isDone = true
+ return
+ }
+ continue
+ }
+
+ buf := bytesBuf.Bytes()
+ // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf))
+
+ batchSize := 0
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = ResumeError
+ glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
+ return
+ }
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+ if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ isDone = true
+ // println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
+ return
+ }
+ lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
+
+ if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil {
+ glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
+ return
+ }
+ if isDone {
+ glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1)
+ return
+ }
+
+ pos += 4 + int(size)
+ batchSize++
+ entryCounter++
+
+ }
+
+ }
+
+}