aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go16
1 files changed, 16 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 8683dfffc..15ea062c6 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -24,6 +24,7 @@ type dataToFlush struct {
}
type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
+type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error)
type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
@@ -63,6 +64,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
notifyFn: notifyFn,
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
+ batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts
}
go lb.loopFlush()
go lb.loopInterval()
@@ -343,6 +345,20 @@ func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b)
}
+// GetName returns the log buffer name for metadata tracking
+func (logBuffer *LogBuffer) GetName() string {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+ return logBuffer.name
+}
+
+// GetBatchIndex returns the current batch index for metadata tracking
+func (logBuffer *LogBuffer) GetBatchIndex() int64 {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+ return logBuffer.batchIndex
+}
+
var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)