aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_read.go')
-rw-r--r--weed/util/log_buffer/log_read.go233
1 files changed, 200 insertions, 33 deletions
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 0ebcc7cc9..77f03ddb8 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -18,19 +18,43 @@ var (
)
type MessagePosition struct {
- time.Time // this is the timestamp of the message
- BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch.
+ Time time.Time // timestamp of the message
+ Offset int64 // Kafka offset for offset-based positioning, or batch index for timestamp-based
+ IsOffsetBased bool // true if this position is offset-based, false if timestamp-based
}
-func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition {
+func NewMessagePosition(tsNs int64, offset int64) MessagePosition {
return MessagePosition{
- Time: time.Unix(0, tsNs).UTC(),
- BatchIndex: batchIndex,
+ Time: time.Unix(0, tsNs).UTC(),
+ Offset: offset,
+ IsOffsetBased: false, // timestamp-based by default
}
}
+// NewMessagePositionFromOffset creates a MessagePosition that represents a specific offset
+func NewMessagePositionFromOffset(offset int64) MessagePosition {
+ return MessagePosition{
+ Time: time.Time{}, // Zero time for offset-based positions
+ Offset: offset,
+ IsOffsetBased: true,
+ }
+}
+
+// GetOffset extracts the offset from an offset-based MessagePosition
+func (mp MessagePosition) GetOffset() int64 {
+ if !mp.IsOffsetBased {
+ return -1 // Not an offset-based position
+ }
+ return mp.Offset // Offset is stored directly
+}
+
func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64,
waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+
+ // Register for instant notifications (<1ms latency)
+ notifyChan := logBuffer.RegisterSubscriber(readerName)
+ defer logBuffer.UnregisterSubscriber(readerName)
+
// loop through all messages
var bytesBuf *bytes.Buffer
var batchIndex int64
@@ -57,10 +81,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if bytesBuf != nil {
readSize = bytesBuf.Len()
}
- glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
+ glog.V(4).Infof("%s ReadFromBuffer at %v offset %d. Read bytes %v batchIndex %d", readerName, lastReadPosition, lastReadPosition.Offset, readSize, batchIndex)
if bytesBuf == nil {
if batchIndex >= 0 {
- lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
+ lastReadPosition = NewMessagePosition(lastReadPosition.Time.UnixNano(), batchIndex)
}
if stopTsNs != 0 {
isDone = true
@@ -69,12 +93,23 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
lastTsNs := logBuffer.LastTsNs.Load()
for lastTsNs == logBuffer.LastTsNs.Load() {
- if waitForDataFn() {
- continue
- } else {
+ if !waitForDataFn() {
isDone = true
return
}
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, break and retry read
+ glog.V(3).Infof("%s: Woke up from notification (LoopProcessLogData)", readerName)
+ break
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, check if timestamp changed
+ if lastTsNs != logBuffer.LastTsNs.Load() {
+ break
+ }
+ glog.V(4).Infof("%s: Notification timeout (LoopProcessLogData), polling", readerName)
+ }
}
if logBuffer.IsStopping() {
isDone = true
@@ -104,6 +139,18 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
pos += 4 + int(size)
continue
}
+
+ // Handle offset-based filtering for offset-based start positions
+ if startPosition.IsOffsetBased {
+ startOffset := startPosition.GetOffset()
+ if logEntry.Offset < startOffset {
+ // Skip entries before the starting offset
+ pos += 4 + int(size)
+ batchSize++
+ continue
+ }
+ }
+
if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
isDone = true
// println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
@@ -131,63 +178,163 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
}
-// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback
-func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64,
- waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+// LoopProcessLogDataWithOffset is similar to LoopProcessLogData but provides offset to the callback
+func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, startPosition MessagePosition, stopTsNs int64,
+ waitForDataFn func() bool, eachLogDataFn EachLogEntryWithOffsetFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+ glog.V(4).Infof("LoopProcessLogDataWithOffset started for %s, startPosition=%v", readerName, startPosition)
+
+ // Register for instant notifications (<1ms latency)
+ notifyChan := logBuffer.RegisterSubscriber(readerName)
+ defer logBuffer.UnregisterSubscriber(readerName)
+
// loop through all messages
var bytesBuf *bytes.Buffer
- var batchIndex int64
+ var offset int64
lastReadPosition = startPosition
var entryCounter int64
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
- // println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter)
+ // println("LoopProcessLogDataWithOffset", readerName, "sent messages total", entryCounter)
}()
for {
+ // Check stopTsNs at the beginning of each iteration
+ // This ensures we exit immediately if the stop time is in the past
+ if stopTsNs != 0 && time.Now().UnixNano() > stopTsNs {
+ isDone = true
+ return
+ }
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
- bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition)
+ bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition)
+ glog.V(4).Infof("ReadFromBuffer for %s returned bytesBuf=%v, offset=%d, err=%v", readerName, bytesBuf != nil, offset, err)
if err == ResumeFromDiskError {
- time.Sleep(1127 * time.Millisecond)
- return lastReadPosition, isDone, ResumeFromDiskError
+ // Try to read from disk if readFromDiskFn is available
+ if logBuffer.ReadFromDiskFn != nil {
+ // Wrap eachLogDataFn to match the expected signature
+ diskReadFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ return eachLogDataFn(logEntry, logEntry.Offset)
+ }
+ lastReadPosition, isDone, err = logBuffer.ReadFromDiskFn(lastReadPosition, stopTsNs, diskReadFn)
+ if err != nil {
+ return lastReadPosition, isDone, err
+ }
+ if isDone {
+ return lastReadPosition, isDone, nil
+ }
+ // Continue to next iteration after disk read
+ }
+
+ // CRITICAL: Check if client is still connected after disk read
+ if !waitForDataFn() {
+ // Client disconnected - exit cleanly
+ glog.V(4).Infof("%s: Client disconnected after disk read", readerName)
+ return lastReadPosition, true, nil
+ }
+
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, retry immediately
+ glog.V(3).Infof("%s: Woke up from notification after disk read", readerName)
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, retry anyway (fallback for edge cases)
+ glog.V(4).Infof("%s: Notification timeout, polling", readerName)
+ }
+
+ // Continue to next iteration (don't return ResumeFromDiskError)
+ continue
}
readSize := 0
if bytesBuf != nil {
readSize = bytesBuf.Len()
}
- glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
+ glog.V(4).Infof("%s ReadFromBuffer at %v posOffset %d. Read bytes %v bufferOffset %d", readerName, lastReadPosition, lastReadPosition.Offset, readSize, offset)
if bytesBuf == nil {
- if batchIndex >= 0 {
- lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
+ // CRITICAL: Check if subscription is still active BEFORE waiting
+ // This prevents infinite loops when client has disconnected
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: waitForDataFn returned false, subscription ending", readerName)
+ return lastReadPosition, true, nil
+ }
+
+ if offset >= 0 {
+ lastReadPosition = NewMessagePosition(lastReadPosition.Time.UnixNano(), offset)
}
if stopTsNs != 0 {
isDone = true
return
}
+
+ // CRITICAL FIX: If we're reading offset-based and there's no data in LogBuffer,
+ // return ResumeFromDiskError to let Subscribe try reading from disk again.
+ // This prevents infinite blocking when all data is on disk (e.g., after restart).
+ if startPosition.IsOffsetBased {
+ glog.V(4).Infof("%s: No data in LogBuffer for offset-based read at %v, checking if client still connected", readerName, lastReadPosition)
+ // Check if client is still connected before busy-looping
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: Client disconnected, stopping offset-based read", readerName)
+ return lastReadPosition, true, nil
+ }
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, retry immediately
+ glog.V(3).Infof("%s: Woke up from notification for offset-based read", readerName)
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, retry anyway (fallback for edge cases)
+ glog.V(4).Infof("%s: Notification timeout for offset-based, polling", readerName)
+ }
+ return lastReadPosition, isDone, ResumeFromDiskError
+ }
+
lastTsNs := logBuffer.LastTsNs.Load()
for lastTsNs == logBuffer.LastTsNs.Load() {
- if waitForDataFn() {
- continue
- } else {
- isDone = true
- return
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: Client disconnected during timestamp wait", readerName)
+ return lastReadPosition, true, nil
+ }
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, break and retry read
+ glog.V(3).Infof("%s: Woke up from notification (main loop)", readerName)
+ break
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, check if timestamp changed
+ if lastTsNs != logBuffer.LastTsNs.Load() {
+ break
+ }
+ glog.V(4).Infof("%s: Notification timeout (main loop), polling", readerName)
}
}
if logBuffer.IsStopping() {
- isDone = true
- return
+ glog.V(4).Infof("%s: LogBuffer is stopping", readerName)
+ return lastReadPosition, true, nil
}
continue
}
buf := bytesBuf.Bytes()
// fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf))
+ glog.V(4).Infof("Processing buffer with %d bytes for %s", len(buf), readerName)
+
+ // If buffer is empty, check if client is still connected before looping
+ if len(buf) == 0 {
+ glog.V(4).Infof("Empty buffer for %s, checking if client still connected", readerName)
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: Client disconnected on empty buffer", readerName)
+ return lastReadPosition, true, nil
+ }
+ // Sleep to avoid busy-wait on empty buffer
+ time.Sleep(10 * time.Millisecond)
+ continue
+ }
batchSize := 0
@@ -196,7 +343,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
err = ResumeError
- glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
+ glog.Errorf("LoopProcessLogDataWithOffset: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
return
}
entryData := buf[pos+4 : pos+4+int(size)]
@@ -207,19 +354,39 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
pos += 4 + int(size)
continue
}
+
+ glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key))
+
+ // Handle offset-based filtering for offset-based start positions
+ if startPosition.IsOffsetBased {
+ startOffset := startPosition.GetOffset()
+ glog.V(4).Infof("Offset-based filtering: logEntry.Offset=%d, startOffset=%d", logEntry.Offset, startOffset)
+ if logEntry.Offset < startOffset {
+ // Skip entries before the starting offset
+ glog.V(4).Infof("Skipping entry due to offset filter")
+ pos += 4 + int(size)
+ batchSize++
+ continue
+ }
+ }
+
if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ glog.V(4).Infof("Stopping due to stopTsNs")
isDone = true
// println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
return
}
- lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
+ // CRITICAL FIX: Use logEntry.Offset + 1 to move PAST the current entry
+ // This prevents infinite loops where we keep requesting the same offset
+ lastReadPosition = NewMessagePosition(logEntry.TsNs, logEntry.Offset+1)
- if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil {
- glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
+ glog.V(4).Infof("Calling eachLogDataFn for entry at offset %d, next position will be %d", logEntry.Offset, logEntry.Offset+1)
+ if isDone, err = eachLogDataFn(logEntry, logEntry.Offset); err != nil {
+ glog.Errorf("LoopProcessLogDataWithOffset: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
return
}
if isDone {
- glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1)
+ glog.V(0).Infof("LoopProcessLogDataWithOffset: %s process log entry %d", readerName, batchSize+1)
return
}