aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/log_buffer/log_buffer.go114
-rw-r--r--weed/util/log_buffer/log_buffer_flush_gap_test.go680
-rw-r--r--weed/util/log_buffer/log_read.go1
-rw-r--r--weed/util/log_buffer/log_read_integration_test.go353
-rw-r--r--weed/util/log_buffer/log_read_stateless.go639
-rw-r--r--weed/util/log_buffer/log_read_stateless_test.go372
6 files changed, 2133 insertions, 26 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index aff8ec80b..692c47b44 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -2,8 +2,8 @@ package log_buffer
import (
"bytes"
+ "fmt"
"math"
- "strings"
"sync"
"sync/atomic"
"time"
@@ -33,6 +33,21 @@ type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int
type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
+// DiskChunkCache caches chunks of historical data read from disk
+type DiskChunkCache struct {
+ mu sync.RWMutex
+ chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize)
+ maxChunks int // Maximum number of chunks to cache
+}
+
+// CachedDiskChunk represents a cached chunk of disk data
+type CachedDiskChunk struct {
+ startOffset int64
+ endOffset int64
+ messages []*filer_pb.LogEntry
+ lastAccess time.Time
+}
+
type LogBuffer struct {
LastFlushTsNs int64
name string
@@ -63,6 +78,8 @@ type LogBuffer struct {
hasOffsets bool
lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet)
+ // Disk chunk cache for historical data reads
+ diskChunkCache *DiskChunkCache
sync.RWMutex
}
@@ -81,6 +98,10 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
offset: 0, // Will be initialized from existing data if available
+ diskChunkCache: &DiskChunkCache{
+ chunks: make(map[int64]*CachedDiskChunk),
+ maxChunks: 16, // Cache up to 16 chunks (configurable)
+ },
}
lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet
go lb.loopFlush()
@@ -359,17 +380,52 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
if logBuffer.LastTsNs.Load() >= processingTsNs {
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
- // Re-marshal with corrected timestamp
logEntry.TsNs = processingTsNs
- logEntryData, _ = proto.Marshal(logEntry)
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
+ // CRITICAL FIX: Set the offset in the LogEntry before marshaling
+ // This ensures the flushed data contains the correct offset information
+ // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads
+ logEntry.Offset = logBuffer.offset
+
+ // DEBUG: Log data being added to buffer for GitHub Actions debugging
+ dataPreview := ""
+ if len(data) > 0 {
+ if len(data) <= 50 {
+ dataPreview = string(data)
+ } else {
+ dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data))
+ }
+ }
+ glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q",
+ logBuffer.name, logBuffer.offset, len(data), dataPreview)
+
+ // Marshal with correct timestamp and offset
+ logEntryData, _ = proto.Marshal(logEntry)
+
size := len(logEntryData)
if logBuffer.pos == 0 {
logBuffer.startTime = ts
+ // Reset offset tracking for new buffer
+ logBuffer.hasOffsets = false
+ }
+
+ // Track offset ranges for Kafka integration
+ // CRITICAL FIX: Track the current offset being written
+ if !logBuffer.hasOffsets {
+ logBuffer.minOffset = logBuffer.offset
+ logBuffer.maxOffset = logBuffer.offset
+ logBuffer.hasOffsets = true
+ } else {
+ if logBuffer.offset < logBuffer.minOffset {
+ logBuffer.minOffset = logBuffer.offset
+ }
+ if logBuffer.offset > logBuffer.maxOffset {
+ logBuffer.maxOffset = logBuffer.offset
+ }
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
@@ -397,6 +453,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
+ logBuffer.offset++
}
func (logBuffer *LogBuffer) IsStopping() bool {
@@ -540,11 +597,29 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush
logBuffer.hasOffsets = false
logBuffer.minOffset = 0
logBuffer.maxOffset = 0
+
+ // CRITICAL FIX: Invalidate disk cache chunks after flush
+ // The cache may contain stale data from before this flush
+ // Invalidating ensures consumers will re-read fresh data from disk after flush
+ logBuffer.invalidateAllDiskCacheChunks()
+
return d
}
return nil
}
+// invalidateAllDiskCacheChunks clears all cached disk chunks
+// This should be called after a buffer flush to ensure consumers read fresh data from disk
+func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() {
+ logBuffer.diskChunkCache.mu.Lock()
+ defer logBuffer.diskChunkCache.mu.Unlock()
+
+ if len(logBuffer.diskChunkCache.chunks) > 0 {
+ glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks))
+ logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk)
+ }
+}
+
func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
return logBuffer.startTime
}
@@ -570,12 +645,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if isOffsetBased {
requestedOffset := lastReadPosition.Offset
- // DEBUG: Log buffer state for _schemas topic
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d",
- requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load())
- }
-
// Check if the requested offset is in the current buffer range
if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset {
// If current buffer is empty (pos=0), check if data is on disk or not yet written
@@ -593,10 +662,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// Case 3: try disk read (historical data might exist)
if requestedOffset < logBuffer.offset {
// Data was in the buffer range but buffer is now empty = flushed to disk
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)",
- requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset)
- }
return nil, -2, ResumeFromDiskError
}
// requestedOffset == logBuffer.offset: Current position
@@ -604,20 +669,11 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// (historical data might exist from previous runs)
if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 {
// Initial state: try disk read before waiting for new data
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0")
- }
return nil, -2, ResumeFromDiskError
}
// Otherwise, wait for new data to arrive
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset)
- }
return nil, logBuffer.offset, nil
}
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos)
- }
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
@@ -661,25 +717,31 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// if td < tm, case 2.3
// read from disk again
var tsMemory time.Time
- var tsBatchIndex int64
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
- tsBatchIndex = logBuffer.offset
}
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
tsMemory = prevBuf.startTime
- tsBatchIndex = prevBuf.offset
}
}
if tsMemory.IsZero() { // case 2.2
return nil, -2, nil
- } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3
+ } else if lastReadPosition.Time.Before(tsMemory) { // case 2.3
+ // CRITICAL FIX: For time-based reads, only check timestamp for disk reads
+ // Don't use offset comparisons as they're not meaningful for time-based subscriptions
+
// Special case: If requested time is zero (Unix epoch), treat as "start from beginning"
// This handles queries that want to read all data without knowing the exact start time
if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 {
// Start from the beginning of memory
// Fall through to case 2.1 to read from earliest buffer
+ } else if lastReadPosition.Offset == 0 && lastReadPosition.Time.Before(tsMemory) {
+ // CRITICAL FIX: If this is the first read (offset=0) and time is slightly before memory,
+ // it's likely a race between starting to read and first message being written
+ // Fall through to case 2.1 to read from earliest buffer instead of triggering disk read
+ glog.V(2).Infof("first read at time %v before earliest memory %v, reading from memory",
+ lastReadPosition.Time, tsMemory)
} else {
// Data not in memory buffers - read from disk
glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v",
diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go
new file mode 100644
index 000000000..63d344b1a
--- /dev/null
+++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go
@@ -0,0 +1,680 @@
+package log_buffer
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/protobuf/proto"
+)
+
+// TestFlushOffsetGap_ReproduceDataLoss reproduces the critical bug where messages
+// are lost in the gap between flushed disk data and in-memory buffer.
+//
+// OBSERVED BEHAVIOR FROM LOGS:
+// Request offset: 1764
+// Disk contains: 1000-1763 (764 messages)
+// Memory buffer starts at: 1800
+// Gap: 1764-1799 (36 messages) ← MISSING!
+//
+// This test verifies:
+// 1. All messages sent to buffer are accounted for
+// 2. No gaps exist between disk and memory offsets
+// 3. Flushed data and in-memory data have continuous offset ranges
+func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) {
+ var flushedMessages []*filer_pb.LogEntry
+ var flushMu sync.Mutex
+
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf))
+
+ // Parse and store flushed messages
+ flushMu.Lock()
+ defer flushMu.Unlock()
+
+ // Parse buffer to extract messages
+ parsedCount := 0
+ for pos := 0; pos+4 < len(buf); {
+ if pos+4 > len(buf) {
+ break
+ }
+
+ size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3])
+ if pos+4+int(size) > len(buf) {
+ break
+ }
+
+ entryData := buf[pos+4 : pos+4+int(size)]
+ logEntry := &filer_pb.LogEntry{}
+ if err := proto.Unmarshal(entryData, logEntry); err == nil {
+ flushedMessages = append(flushedMessages, logEntry)
+ parsedCount++
+ }
+
+ pos += 4 + int(size)
+ }
+
+ t.Logf(" Parsed %d messages from flush buffer", parsedCount)
+ }
+
+ logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Send 100 messages
+ messageCount := 100
+ t.Logf("Sending %d messages...", messageCount)
+
+ for i := 0; i < messageCount; i++ {
+ logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ Key: []byte(fmt.Sprintf("key-%d", i)),
+ Value: []byte(fmt.Sprintf("message-%d", i)),
+ TsNs: time.Now().UnixNano(),
+ })
+ }
+
+ // Force flush multiple times to simulate real workload
+ t.Logf("Forcing flush...")
+ logBuffer.ForceFlush()
+
+ // Add more messages after flush
+ for i := messageCount; i < messageCount+50; i++ {
+ logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ Key: []byte(fmt.Sprintf("key-%d", i)),
+ Value: []byte(fmt.Sprintf("message-%d", i)),
+ TsNs: time.Now().UnixNano(),
+ })
+ }
+
+ // Force another flush
+ logBuffer.ForceFlush()
+ time.Sleep(200 * time.Millisecond) // Wait for flush to complete
+
+ // Now check the buffer state
+ logBuffer.RLock()
+ bufferStartOffset := logBuffer.bufferStartOffset
+ currentOffset := logBuffer.offset
+ pos := logBuffer.pos
+ logBuffer.RUnlock()
+
+ flushMu.Lock()
+ flushedCount := len(flushedMessages)
+ var maxFlushedOffset int64 = -1
+ var minFlushedOffset int64 = -1
+ if flushedCount > 0 {
+ minFlushedOffset = flushedMessages[0].Offset
+ maxFlushedOffset = flushedMessages[flushedCount-1].Offset
+ }
+ flushMu.Unlock()
+
+ t.Logf("\nBUFFER STATE AFTER FLUSH:")
+ t.Logf(" bufferStartOffset: %d", bufferStartOffset)
+ t.Logf(" currentOffset (HWM): %d", currentOffset)
+ t.Logf(" pos (bytes in buffer): %d", pos)
+ t.Logf(" Messages sent: %d (offsets 0-%d)", messageCount+50, messageCount+49)
+ t.Logf(" Messages flushed to disk: %d (offsets %d-%d)", flushedCount, minFlushedOffset, maxFlushedOffset)
+
+ // CRITICAL CHECK: Is there a gap between flushed data and memory buffer?
+ if flushedCount > 0 && maxFlushedOffset >= 0 {
+ gap := bufferStartOffset - (maxFlushedOffset + 1)
+
+ t.Logf("\nOFFSET CONTINUITY CHECK:")
+ t.Logf(" Last flushed offset: %d", maxFlushedOffset)
+ t.Logf(" Buffer starts at: %d", bufferStartOffset)
+ t.Logf(" Gap: %d offsets", gap)
+
+ if gap > 0 {
+ t.Errorf("❌ CRITICAL BUG REPRODUCED: OFFSET GAP DETECTED!")
+ t.Errorf(" Disk has offsets %d-%d", minFlushedOffset, maxFlushedOffset)
+ t.Errorf(" Memory buffer starts at: %d", bufferStartOffset)
+ t.Errorf(" MISSING OFFSETS: %d-%d (%d messages)", maxFlushedOffset+1, bufferStartOffset-1, gap)
+ t.Errorf(" These messages are LOST - neither on disk nor in memory!")
+ } else if gap < 0 {
+ t.Errorf("❌ OFFSET OVERLAP: Memory buffer starts BEFORE last flushed offset!")
+ t.Errorf(" This indicates data corruption or race condition")
+ } else {
+ t.Logf("✅ PASS: No gap detected - offsets are continuous")
+ }
+
+ // Check if we can read all expected offsets
+ t.Logf("\nREADABILITY CHECK:")
+ for testOffset := int64(0); testOffset < currentOffset; testOffset += 10 {
+ // Try to read from buffer
+ requestPosition := NewMessagePositionFromOffset(testOffset)
+ buf, _, err := logBuffer.ReadFromBuffer(requestPosition)
+
+ isReadable := (buf != nil && len(buf.Bytes()) > 0) || err == ResumeFromDiskError
+ status := "✅"
+ if !isReadable && err == nil {
+ status = "❌ NOT READABLE"
+ }
+
+ t.Logf(" Offset %d: %s (buf=%v, err=%v)", testOffset, status, buf != nil, err)
+
+ // If offset is in the gap, it should fail to read
+ if flushedCount > 0 && testOffset > maxFlushedOffset && testOffset < bufferStartOffset {
+ if isReadable {
+ t.Errorf(" Unexpected: Offset %d in gap range should NOT be readable!", testOffset)
+ } else {
+ t.Logf(" Expected: Offset %d in gap is not readable (data lost)", testOffset)
+ }
+ }
+ }
+ }
+
+ // Check that all sent messages are accounted for
+ expectedMessageCount := messageCount + 50
+ messagesInMemory := int(currentOffset - bufferStartOffset)
+ totalAccountedFor := flushedCount + messagesInMemory
+
+ t.Logf("\nMESSAGE ACCOUNTING:")
+ t.Logf(" Expected: %d messages", expectedMessageCount)
+ t.Logf(" Flushed to disk: %d", flushedCount)
+ t.Logf(" In memory buffer: %d (offset range %d-%d)", messagesInMemory, bufferStartOffset, currentOffset-1)
+ t.Logf(" Total accounted for: %d", totalAccountedFor)
+ t.Logf(" Missing: %d messages", expectedMessageCount-totalAccountedFor)
+
+ if totalAccountedFor < expectedMessageCount {
+ t.Errorf("❌ DATA LOSS CONFIRMED: %d messages are missing!", expectedMessageCount-totalAccountedFor)
+ } else {
+ t.Logf("✅ All messages accounted for")
+ }
+}
+
+// TestFlushOffsetGap_CheckPrevBuffers tests if messages might be stuck in prevBuffers
+// instead of being properly flushed to disk.
+func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) {
+ var flushCount int
+ var flushMu sync.Mutex
+
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ flushMu.Lock()
+ flushCount++
+ count := flushCount
+ flushMu.Unlock()
+
+ t.Logf("FLUSH #%d: minOffset=%d maxOffset=%d size=%d bytes", count, minOffset, maxOffset, len(buf))
+ }
+
+ logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Send messages in batches with flushes in between
+ for batch := 0; batch < 5; batch++ {
+ t.Logf("\nBatch %d:", batch)
+
+ // Send 20 messages
+ for i := 0; i < 20; i++ {
+ offset := int64(batch*20 + i)
+ logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ Key: []byte(fmt.Sprintf("key-%d", offset)),
+ Value: []byte(fmt.Sprintf("message-%d", offset)),
+ TsNs: time.Now().UnixNano(),
+ })
+ }
+
+ // Check state before flush
+ logBuffer.RLock()
+ beforeFlushOffset := logBuffer.offset
+ beforeFlushStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+
+ // Force flush
+ logBuffer.ForceFlush()
+ time.Sleep(50 * time.Millisecond)
+
+ // Check state after flush
+ logBuffer.RLock()
+ afterFlushOffset := logBuffer.offset
+ afterFlushStart := logBuffer.bufferStartOffset
+ prevBufferCount := len(logBuffer.prevBuffers.buffers)
+
+ // Check prevBuffers state
+ t.Logf(" Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart)
+ t.Logf(" After flush: offset=%d, bufferStartOffset=%d, prevBuffers=%d",
+ afterFlushOffset, afterFlushStart, prevBufferCount)
+
+ // Check each prevBuffer
+ for i, prevBuf := range logBuffer.prevBuffers.buffers {
+ if prevBuf.size > 0 {
+ t.Logf(" prevBuffer[%d]: offsets %d-%d, size=%d bytes (NOT FLUSHED!)",
+ i, prevBuf.startOffset, prevBuf.offset, prevBuf.size)
+ }
+ }
+ logBuffer.RUnlock()
+
+ // CRITICAL: Check if bufferStartOffset advanced correctly
+ expectedNewStart := beforeFlushOffset
+ if afterFlushStart != expectedNewStart {
+ t.Errorf(" ❌ bufferStartOffset mismatch!")
+ t.Errorf(" Expected: %d (= offset before flush)", expectedNewStart)
+ t.Errorf(" Actual: %d", afterFlushStart)
+ t.Errorf(" Gap: %d offsets", expectedNewStart-afterFlushStart)
+ }
+ }
+}
+
+// TestFlushOffsetGap_ConcurrentWriteAndFlush tests for race conditions
+// between writing new messages and flushing old ones.
+func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) {
+ var allFlushedOffsets []int64
+ var flushMu sync.Mutex
+
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf))
+
+ flushMu.Lock()
+ // Record the offset range that was flushed
+ for offset := minOffset; offset <= maxOffset; offset++ {
+ allFlushedOffsets = append(allFlushedOffsets, offset)
+ }
+ flushMu.Unlock()
+ }
+
+ logBuffer := NewLogBuffer("test", 50*time.Millisecond, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Concurrently write messages and force flushes
+ var wg sync.WaitGroup
+
+ // Writer goroutine
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < 200; i++ {
+ logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ Key: []byte(fmt.Sprintf("key-%d", i)),
+ Value: []byte(fmt.Sprintf("message-%d", i)),
+ TsNs: time.Now().UnixNano(),
+ })
+ if i%50 == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+ }()
+
+ // Flusher goroutine
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < 5; i++ {
+ time.Sleep(30 * time.Millisecond)
+ logBuffer.ForceFlush()
+ }
+ }()
+
+ wg.Wait()
+ time.Sleep(200 * time.Millisecond) // Wait for final flush
+
+ // Check final state
+ logBuffer.RLock()
+ finalOffset := logBuffer.offset
+ finalBufferStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+
+ flushMu.Lock()
+ flushedCount := len(allFlushedOffsets)
+ flushMu.Unlock()
+
+ expectedCount := int(finalOffset)
+ inMemory := int(finalOffset - finalBufferStart)
+ totalAccountedFor := flushedCount + inMemory
+
+ t.Logf("\nFINAL STATE:")
+ t.Logf(" Total messages sent: %d (offsets 0-%d)", expectedCount, expectedCount-1)
+ t.Logf(" Flushed to disk: %d", flushedCount)
+ t.Logf(" In memory: %d (offsets %d-%d)", inMemory, finalBufferStart, finalOffset-1)
+ t.Logf(" Total accounted: %d", totalAccountedFor)
+ t.Logf(" Missing: %d", expectedCount-totalAccountedFor)
+
+ if totalAccountedFor < expectedCount {
+ t.Errorf("❌ DATA LOSS in concurrent scenario: %d messages missing!", expectedCount-totalAccountedFor)
+ }
+}
+
+// TestFlushOffsetGap_ProductionScenario reproduces the actual production scenario
+// where the broker uses AddLogEntryToBuffer with explicit Kafka offsets.
+// This simulates leader publishing with offset assignment.
+func TestFlushOffsetGap_ProductionScenario(t *testing.T) {
+ var flushedData []struct {
+ minOffset int64
+ maxOffset int64
+ messages []*filer_pb.LogEntry
+ }
+ var flushMu sync.Mutex
+
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ // Parse messages from buffer
+ messages := []*filer_pb.LogEntry{}
+ for pos := 0; pos+4 < len(buf); {
+ size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3])
+ if pos+4+int(size) > len(buf) {
+ break
+ }
+ entryData := buf[pos+4 : pos+4+int(size)]
+ logEntry := &filer_pb.LogEntry{}
+ if err := proto.Unmarshal(entryData, logEntry); err == nil {
+ messages = append(messages, logEntry)
+ }
+ pos += 4 + int(size)
+ }
+
+ flushMu.Lock()
+ flushedData = append(flushedData, struct {
+ minOffset int64
+ maxOffset int64
+ messages []*filer_pb.LogEntry
+ }{minOffset, maxOffset, messages})
+ flushMu.Unlock()
+
+ t.Logf("FLUSH: minOffset=%d maxOffset=%d, parsed %d messages", minOffset, maxOffset, len(messages))
+ }
+
+ logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Simulate broker behavior: assign Kafka offsets and add to buffer
+ // This is what PublishWithOffset() does
+ nextKafkaOffset := int64(0)
+
+ // Round 1: Add 50 messages with Kafka offsets 0-49
+ t.Logf("\n=== ROUND 1: Adding messages 0-49 ===")
+ for i := 0; i < 50; i++ {
+ logEntry := &filer_pb.LogEntry{
+ Key: []byte(fmt.Sprintf("key-%d", i)),
+ Data: []byte(fmt.Sprintf("message-%d", i)),
+ TsNs: time.Now().UnixNano(),
+ Offset: nextKafkaOffset, // Explicit Kafka offset
+ }
+ logBuffer.AddLogEntryToBuffer(logEntry)
+ nextKafkaOffset++
+ }
+
+ // Check buffer state before flush
+ logBuffer.RLock()
+ beforeFlushOffset := logBuffer.offset
+ beforeFlushStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+ t.Logf("Before flush: logBuffer.offset=%d, bufferStartOffset=%d, nextKafkaOffset=%d",
+ beforeFlushOffset, beforeFlushStart, nextKafkaOffset)
+
+ // Flush
+ logBuffer.ForceFlush()
+ time.Sleep(100 * time.Millisecond)
+
+ // Check buffer state after flush
+ logBuffer.RLock()
+ afterFlushOffset := logBuffer.offset
+ afterFlushStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+ t.Logf("After flush: logBuffer.offset=%d, bufferStartOffset=%d",
+ afterFlushOffset, afterFlushStart)
+
+ // Round 2: Add another 50 messages with Kafka offsets 50-99
+ t.Logf("\n=== ROUND 2: Adding messages 50-99 ===")
+ for i := 0; i < 50; i++ {
+ logEntry := &filer_pb.LogEntry{
+ Key: []byte(fmt.Sprintf("key-%d", 50+i)),
+ Data: []byte(fmt.Sprintf("message-%d", 50+i)),
+ TsNs: time.Now().UnixNano(),
+ Offset: nextKafkaOffset,
+ }
+ logBuffer.AddLogEntryToBuffer(logEntry)
+ nextKafkaOffset++
+ }
+
+ logBuffer.ForceFlush()
+ time.Sleep(100 * time.Millisecond)
+
+ // Verification: Check if all Kafka offsets are accounted for
+ flushMu.Lock()
+ t.Logf("\n=== VERIFICATION ===")
+ t.Logf("Expected Kafka offsets: 0-%d", nextKafkaOffset-1)
+
+ allOffsets := make(map[int64]bool)
+ for flushIdx, flush := range flushedData {
+ t.Logf("Flush #%d: minOffset=%d, maxOffset=%d, messages=%d",
+ flushIdx, flush.minOffset, flush.maxOffset, len(flush.messages))
+
+ for _, msg := range flush.messages {
+ if allOffsets[msg.Offset] {
+ t.Errorf(" ❌ DUPLICATE: Offset %d appears multiple times!", msg.Offset)
+ }
+ allOffsets[msg.Offset] = true
+ }
+ }
+ flushMu.Unlock()
+
+ // Check for missing offsets
+ missingOffsets := []int64{}
+ for expectedOffset := int64(0); expectedOffset < nextKafkaOffset; expectedOffset++ {
+ if !allOffsets[expectedOffset] {
+ missingOffsets = append(missingOffsets, expectedOffset)
+ }
+ }
+
+ if len(missingOffsets) > 0 {
+ t.Errorf("\n❌ MISSING OFFSETS DETECTED: %d offsets missing", len(missingOffsets))
+ if len(missingOffsets) <= 20 {
+ t.Errorf("Missing: %v", missingOffsets)
+ } else {
+ t.Errorf("Missing: %v ... and %d more", missingOffsets[:20], len(missingOffsets)-20)
+ }
+ t.Errorf("\nThis reproduces the production bug!")
+ } else {
+ t.Logf("\n✅ SUCCESS: All %d Kafka offsets accounted for (0-%d)", nextKafkaOffset, nextKafkaOffset-1)
+ }
+
+ // Check buffer offset consistency
+ logBuffer.RLock()
+ finalOffset := logBuffer.offset
+ finalBufferStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+
+ t.Logf("\nFinal buffer state:")
+ t.Logf(" logBuffer.offset: %d", finalOffset)
+ t.Logf(" bufferStartOffset: %d", finalBufferStart)
+ t.Logf(" Expected (nextKafkaOffset): %d", nextKafkaOffset)
+
+ if finalOffset != nextKafkaOffset {
+ t.Errorf("❌ logBuffer.offset mismatch: expected %d, got %d", nextKafkaOffset, finalOffset)
+ }
+}
+
+// TestFlushOffsetGap_ConcurrentReadDuringFlush tests if concurrent reads
+// during flush can cause messages to be missed.
+func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) {
+ var flushedOffsets []int64
+ var flushMu sync.Mutex
+
+ readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) {
+ // Simulate reading from disk - return flushed offsets
+ flushMu.Lock()
+ defer flushMu.Unlock()
+
+ for _, offset := range flushedOffsets {
+ if offset >= startPosition.Offset {
+ logEntry := &filer_pb.LogEntry{
+ Key: []byte(fmt.Sprintf("key-%d", offset)),
+ Data: []byte(fmt.Sprintf("message-%d", offset)),
+ TsNs: time.Now().UnixNano(),
+ Offset: offset,
+ }
+ isDone, err := eachLogEntryFn(logEntry)
+ if err != nil || isDone {
+ return NewMessagePositionFromOffset(offset + 1), isDone, err
+ }
+ }
+ }
+ return startPosition, false, nil
+ }
+
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ // Parse and store flushed offsets
+ flushMu.Lock()
+ defer flushMu.Unlock()
+
+ for pos := 0; pos+4 < len(buf); {
+ size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3])
+ if pos+4+int(size) > len(buf) {
+ break
+ }
+ entryData := buf[pos+4 : pos+4+int(size)]
+ logEntry := &filer_pb.LogEntry{}
+ if err := proto.Unmarshal(entryData, logEntry); err == nil {
+ flushedOffsets = append(flushedOffsets, logEntry.Offset)
+ }
+ pos += 4 + int(size)
+ }
+
+ t.Logf("FLUSH: Stored %d offsets to disk (minOffset=%d, maxOffset=%d)",
+ len(flushedOffsets), minOffset, maxOffset)
+ }
+
+ logBuffer := NewLogBuffer("test", time.Hour, flushFn, readFromDiskFn, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Add 100 messages
+ t.Logf("Adding 100 messages...")
+ for i := int64(0); i < 100; i++ {
+ logEntry := &filer_pb.LogEntry{
+ Key: []byte(fmt.Sprintf("key-%d", i)),
+ Data: []byte(fmt.Sprintf("message-%d", i)),
+ TsNs: time.Now().UnixNano(),
+ Offset: i,
+ }
+ logBuffer.AddLogEntryToBuffer(logEntry)
+ }
+
+ // Flush (moves data to disk)
+ t.Logf("Flushing...")
+ logBuffer.ForceFlush()
+ time.Sleep(100 * time.Millisecond)
+
+ // Now try to read all messages using ReadMessagesAtOffset
+ t.Logf("\nReading messages from offset 0...")
+ messages, nextOffset, hwm, endOfPartition, err := logBuffer.ReadMessagesAtOffset(0, 1000, 1024*1024)
+
+ t.Logf("Read result: messages=%d, nextOffset=%d, hwm=%d, endOfPartition=%v, err=%v",
+ len(messages), nextOffset, hwm, endOfPartition, err)
+
+ // Verify all offsets can be read
+ readOffsets := make(map[int64]bool)
+ for _, msg := range messages {
+ readOffsets[msg.Offset] = true
+ }
+
+ missingOffsets := []int64{}
+ for expectedOffset := int64(0); expectedOffset < 100; expectedOffset++ {
+ if !readOffsets[expectedOffset] {
+ missingOffsets = append(missingOffsets, expectedOffset)
+ }
+ }
+
+ if len(missingOffsets) > 0 {
+ t.Errorf("❌ MISSING OFFSETS after flush: %d offsets cannot be read", len(missingOffsets))
+ if len(missingOffsets) <= 20 {
+ t.Errorf("Missing: %v", missingOffsets)
+ } else {
+ t.Errorf("Missing: %v ... and %d more", missingOffsets[:20], len(missingOffsets)-20)
+ }
+ } else {
+ t.Logf("✅ All 100 offsets can be read after flush")
+ }
+}
+
+// TestFlushOffsetGap_ForceFlushAdvancesBuffer tests if ForceFlush
+// properly advances bufferStartOffset after flushing.
+func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) {
+ flushedRanges := []struct{ min, max int64 }{}
+ var flushMu sync.Mutex
+
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ flushMu.Lock()
+ flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset})
+ flushMu.Unlock()
+ t.Logf("FLUSH: offsets %d-%d", minOffset, maxOffset)
+ }
+
+ logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) // Long interval, manual flush only
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Send messages, flush, check state - repeat
+ for round := 0; round < 3; round++ {
+ t.Logf("\n=== ROUND %d ===", round)
+
+ // Check state before adding messages
+ logBuffer.RLock()
+ beforeOffset := logBuffer.offset
+ beforeStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+
+ t.Logf("Before adding: offset=%d, bufferStartOffset=%d", beforeOffset, beforeStart)
+
+ // Add 10 messages
+ for i := 0; i < 10; i++ {
+ logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ Key: []byte(fmt.Sprintf("round-%d-msg-%d", round, i)),
+ Value: []byte(fmt.Sprintf("data-%d-%d", round, i)),
+ TsNs: time.Now().UnixNano(),
+ })
+ }
+
+ // Check state after adding
+ logBuffer.RLock()
+ afterAddOffset := logBuffer.offset
+ afterAddStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+
+ t.Logf("After adding: offset=%d, bufferStartOffset=%d", afterAddOffset, afterAddStart)
+
+ // Force flush
+ t.Logf("Forcing flush...")
+ logBuffer.ForceFlush()
+ time.Sleep(100 * time.Millisecond)
+
+ // Check state after flush
+ logBuffer.RLock()
+ afterFlushOffset := logBuffer.offset
+ afterFlushStart := logBuffer.bufferStartOffset
+ logBuffer.RUnlock()
+
+ t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart)
+
+ // CRITICAL CHECK: bufferStartOffset should advance to where offset was before flush
+ if afterFlushStart != afterAddOffset {
+ t.Errorf("❌ FLUSH BUG: bufferStartOffset did NOT advance correctly!")
+ t.Errorf(" Expected bufferStartOffset=%d (= offset after add)", afterAddOffset)
+ t.Errorf(" Actual bufferStartOffset=%d", afterFlushStart)
+ t.Errorf(" Gap: %d offsets WILL BE LOST", afterAddOffset-afterFlushStart)
+ } else {
+ t.Logf("✅ bufferStartOffset correctly advanced to %d", afterFlushStart)
+ }
+ }
+
+ // Final verification: check all offset ranges are continuous
+ flushMu.Lock()
+ t.Logf("\n=== FLUSHED RANGES ===")
+ for i, r := range flushedRanges {
+ t.Logf("Flush #%d: offsets %d-%d", i, r.min, r.max)
+
+ // Check continuity with previous flush
+ if i > 0 {
+ prevMax := flushedRanges[i-1].max
+ currentMin := r.min
+ gap := currentMin - (prevMax + 1)
+
+ if gap > 0 {
+ t.Errorf("❌ GAP between flush #%d and #%d: %d offsets missing!", i-1, i, gap)
+ } else if gap < 0 {
+ t.Errorf("❌ OVERLAP between flush #%d and #%d: %d offsets duplicated!", i-1, i, -gap)
+ } else {
+ t.Logf(" ✅ Continuous with previous flush")
+ }
+ }
+ }
+ flushMu.Unlock()
+}
+
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 77f03ddb8..3b7b99ada 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -355,6 +355,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
continue
}
+
glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key))
// Handle offset-based filtering for offset-based start positions
diff --git a/weed/util/log_buffer/log_read_integration_test.go b/weed/util/log_buffer/log_read_integration_test.go
new file mode 100644
index 000000000..38549b9f7
--- /dev/null
+++ b/weed/util/log_buffer/log_read_integration_test.go
@@ -0,0 +1,353 @@
+package log_buffer
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+// TestConcurrentProducerConsumer simulates the integration test scenario:
+// - One producer writing messages continuously
+// - Multiple consumers reading from different offsets
+// - Consumers reading sequentially (like Kafka consumers)
+func TestConcurrentProducerConsumer(t *testing.T) {
+ lb := NewLogBuffer("integration-test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ const numMessages = 1000
+ const numConsumers = 2
+ const messagesPerConsumer = numMessages / numConsumers
+
+ // Start producer
+ producerDone := make(chan bool)
+ go func() {
+ for i := 0; i < numMessages; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ time.Sleep(1 * time.Millisecond) // Simulate production rate
+ }
+ producerDone <- true
+ }()
+
+ // Start consumers
+ consumerWg := sync.WaitGroup{}
+ consumerErrors := make(chan error, numConsumers)
+ consumedCounts := make([]int64, numConsumers)
+
+ for consumerID := 0; consumerID < numConsumers; consumerID++ {
+ consumerWg.Add(1)
+ go func(id int, startOffset int64, endOffset int64) {
+ defer consumerWg.Done()
+
+ currentOffset := startOffset
+ for currentOffset < endOffset {
+ // Read 10 messages at a time (like integration test)
+ messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240)
+ if err != nil {
+ consumerErrors <- err
+ return
+ }
+
+ if len(messages) == 0 {
+ // No data yet, wait a bit
+ time.Sleep(5 * time.Millisecond)
+ continue
+ }
+
+ // Count only messages in this consumer's assigned range
+ messagesInRange := 0
+ for i, msg := range messages {
+ if msg.Offset >= startOffset && msg.Offset < endOffset {
+ messagesInRange++
+ expectedOffset := currentOffset + int64(i)
+ if msg.Offset != expectedOffset {
+ t.Errorf("Consumer %d: Expected offset %d, got %d", id, expectedOffset, msg.Offset)
+ }
+ }
+ }
+
+ atomic.AddInt64(&consumedCounts[id], int64(messagesInRange))
+ currentOffset = nextOffset
+ }
+ }(consumerID, int64(consumerID*messagesPerConsumer), int64((consumerID+1)*messagesPerConsumer))
+ }
+
+ // Wait for producer to finish
+ <-producerDone
+
+ // Wait for consumers (with timeout)
+ done := make(chan bool)
+ go func() {
+ consumerWg.Wait()
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ // Success
+ case err := <-consumerErrors:
+ t.Fatalf("Consumer error: %v", err)
+ case <-time.After(10 * time.Second):
+ t.Fatal("Timeout waiting for consumers to finish")
+ }
+
+ // Verify all messages were consumed
+ totalConsumed := int64(0)
+ for i, count := range consumedCounts {
+ t.Logf("Consumer %d consumed %d messages", i, count)
+ totalConsumed += count
+ }
+
+ if totalConsumed != numMessages {
+ t.Errorf("Expected to consume %d messages, but consumed %d", numMessages, totalConsumed)
+ }
+}
+
+// TestBackwardSeeksWhileProducing simulates consumer rebalancing where
+// consumers seek backward to earlier offsets while producer is still writing
+func TestBackwardSeeksWhileProducing(t *testing.T) {
+ lb := NewLogBuffer("backward-seek-test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ const numMessages = 500
+ const numSeeks = 10
+
+ // Start producer
+ producerDone := make(chan bool)
+ go func() {
+ for i := 0; i < numMessages; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ time.Sleep(1 * time.Millisecond)
+ }
+ producerDone <- true
+ }()
+
+ // Consumer that seeks backward periodically
+ consumerDone := make(chan bool)
+ readOffsets := make(map[int64]int) // Track how many times each offset was read
+
+ go func() {
+ currentOffset := int64(0)
+ seeksRemaining := numSeeks
+
+ for currentOffset < numMessages {
+ // Read some messages
+ messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240)
+ if err != nil {
+ // For stateless reads, "offset out of range" means data not in memory yet
+ // This is expected when reading historical data or before production starts
+ time.Sleep(5 * time.Millisecond)
+ continue
+ }
+
+ if len(messages) == 0 {
+ // No data available yet or caught up to producer
+ if !endOfPartition {
+ // Data might be coming, wait
+ time.Sleep(5 * time.Millisecond)
+ } else {
+ // At end of partition, wait for more production
+ time.Sleep(5 * time.Millisecond)
+ }
+ continue
+ }
+
+ // Track read offsets
+ for _, msg := range messages {
+ readOffsets[msg.Offset]++
+ }
+
+ // Periodically seek backward (simulating rebalancing)
+ if seeksRemaining > 0 && nextOffset > 50 && nextOffset%100 == 0 {
+ seekOffset := nextOffset - 20
+ t.Logf("Seeking backward from %d to %d", nextOffset, seekOffset)
+ currentOffset = seekOffset
+ seeksRemaining--
+ } else {
+ currentOffset = nextOffset
+ }
+ }
+
+ consumerDone <- true
+ }()
+
+ // Wait for both
+ <-producerDone
+ <-consumerDone
+
+ // Verify each offset was read at least once
+ for i := int64(0); i < numMessages; i++ {
+ if readOffsets[i] == 0 {
+ t.Errorf("Offset %d was never read", i)
+ }
+ }
+
+ t.Logf("Total unique offsets read: %d out of %d", len(readOffsets), numMessages)
+}
+
+// TestHighConcurrencyReads simulates multiple consumers reading from
+// different offsets simultaneously (stress test)
+func TestHighConcurrencyReads(t *testing.T) {
+ lb := NewLogBuffer("high-concurrency-test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ const numMessages = 1000
+ const numReaders = 10
+
+ // Pre-populate buffer
+ for i := 0; i < numMessages; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // Start many concurrent readers at different offsets
+ wg := sync.WaitGroup{}
+ errors := make(chan error, numReaders)
+
+ for reader := 0; reader < numReaders; reader++ {
+ wg.Add(1)
+ go func(startOffset int64) {
+ defer wg.Done()
+
+ // Read 100 messages from this offset
+ currentOffset := startOffset
+ readCount := 0
+
+ for readCount < 100 && currentOffset < numMessages {
+ messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240)
+ if err != nil {
+ errors <- err
+ return
+ }
+
+ // Verify offsets are sequential
+ for i, msg := range messages {
+ expected := currentOffset + int64(i)
+ if msg.Offset != expected {
+ t.Errorf("Reader at %d: expected offset %d, got %d", startOffset, expected, msg.Offset)
+ }
+ }
+
+ readCount += len(messages)
+ currentOffset = nextOffset
+ }
+ }(int64(reader * 10))
+ }
+
+ // Wait with timeout
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ // Success
+ case err := <-errors:
+ t.Fatalf("Reader error: %v", err)
+ case <-time.After(10 * time.Second):
+ t.Fatal("Timeout waiting for readers")
+ }
+}
+
+// TestRepeatedReadsAtSameOffset simulates what happens when Kafka
+// consumer re-fetches the same offset multiple times (due to timeouts or retries)
+func TestRepeatedReadsAtSameOffset(t *testing.T) {
+ lb := NewLogBuffer("repeated-reads-test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ const numMessages = 100
+
+ // Pre-populate buffer
+ for i := 0; i < numMessages; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // Read the same offset multiple times concurrently
+ const numReads = 10
+ const testOffset = int64(50)
+
+ wg := sync.WaitGroup{}
+ results := make([][]*filer_pb.LogEntry, numReads)
+
+ for i := 0; i < numReads; i++ {
+ wg.Add(1)
+ go func(idx int) {
+ defer wg.Done()
+ messages, _, _, _, err := lb.ReadMessagesAtOffset(testOffset, 10, 10240)
+ if err != nil {
+ t.Errorf("Read %d error: %v", idx, err)
+ return
+ }
+ results[idx] = messages
+ }(i)
+ }
+
+ wg.Wait()
+
+ // Verify all reads returned the same data
+ firstRead := results[0]
+ for i := 1; i < numReads; i++ {
+ if len(results[i]) != len(firstRead) {
+ t.Errorf("Read %d returned %d messages, expected %d", i, len(results[i]), len(firstRead))
+ }
+
+ for j := range results[i] {
+ if results[i][j].Offset != firstRead[j].Offset {
+ t.Errorf("Read %d message %d has offset %d, expected %d",
+ i, j, results[i][j].Offset, firstRead[j].Offset)
+ }
+ }
+ }
+}
+
+// TestEmptyPartitionPolling simulates consumers polling empty partitions
+// waiting for data (common in Kafka)
+func TestEmptyPartitionPolling(t *testing.T) {
+ lb := NewLogBuffer("empty-partition-test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+ lb.bufferStartOffset = 0
+ lb.offset = 0
+
+ // Try to read from empty partition
+ messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(0, 10, 10240)
+
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+ if len(messages) != 0 {
+ t.Errorf("Expected 0 messages, got %d", len(messages))
+ }
+ if nextOffset != 0 {
+ t.Errorf("Expected nextOffset=0, got %d", nextOffset)
+ }
+ if !endOfPartition {
+ t.Error("Expected endOfPartition=true for future offset")
+ }
+}
diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go
new file mode 100644
index 000000000..b57f7742f
--- /dev/null
+++ b/weed/util/log_buffer/log_read_stateless.go
@@ -0,0 +1,639 @@
+package log_buffer
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "google.golang.org/protobuf/proto"
+)
+
+// ReadMessagesAtOffset provides Kafka-style stateless reads from LogBuffer
+// Each call is completely independent - no state maintained between calls
+// Thread-safe for concurrent reads at different offsets
+//
+// This is the recommended API for stateless clients like Kafka gateway
+// Unlike Subscribe loops, this:
+// 1. Returns immediately with available data (or empty if none)
+// 2. Does not maintain any session state
+// 3. Safe for concurrent calls
+// 4. No cancellation/restart complexity
+//
+// Returns:
+// - messages: Array of messages starting at startOffset
+// - nextOffset: Offset to use for next fetch
+// - highWaterMark: Highest offset available in partition
+// - endOfPartition: True if no more data available
+// - err: Any error encountered
+func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages int, maxBytes int) (
+ messages []*filer_pb.LogEntry,
+ nextOffset int64,
+ highWaterMark int64,
+ endOfPartition bool,
+ err error,
+) {
+ glog.Infof("[StatelessRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d",
+ startOffset, maxMessages, maxBytes)
+
+ // Quick validation
+ if maxMessages <= 0 {
+ maxMessages = 100 // Default reasonable batch size
+ }
+ if maxBytes <= 0 {
+ maxBytes = 4 * 1024 * 1024 // 4MB default
+ }
+
+ messages = make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset = startOffset
+
+ // Try to read from in-memory buffers first (hot path)
+ logBuffer.RLock()
+ currentBufferEnd := logBuffer.offset
+ bufferStartOffset := logBuffer.bufferStartOffset
+ highWaterMark = currentBufferEnd
+
+ glog.Infof("[StatelessRead] Buffer state: startOffset=%d, bufferStart=%d, bufferEnd=%d, HWM=%d, pos=%d",
+ startOffset, bufferStartOffset, currentBufferEnd, highWaterMark, logBuffer.pos)
+
+ // Special case: empty buffer (no data written yet)
+ if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 {
+ logBuffer.RUnlock()
+ glog.Infof("[StatelessRead] PATH: Empty buffer (no data written yet)")
+ // Return empty result - partition exists but has no data yet
+ // Preserve the requested offset in nextOffset
+ return messages, startOffset, 0, true, nil
+ }
+
+ // Check if requested offset is in current buffer
+ if startOffset >= bufferStartOffset && startOffset < currentBufferEnd {
+ glog.Infof("[StatelessRead] PATH: Attempting to read from current/previous memory buffers")
+ // Read from current buffer
+ glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d",
+ bufferStartOffset, currentBufferEnd)
+
+ if logBuffer.pos > 0 {
+ // Make a copy of the buffer to avoid concurrent modification
+ bufCopy := make([]byte, logBuffer.pos)
+ copy(bufCopy, logBuffer.buf[:logBuffer.pos])
+ logBuffer.RUnlock() // Release lock early
+
+ // Parse messages from buffer copy
+ messages, nextOffset, _, err = parseMessagesFromBuffer(
+ bufCopy, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.V(4).Infof("[StatelessRead] Read %d messages from current buffer, nextOffset=%d",
+ len(messages), nextOffset)
+
+ // Check if we reached the end
+ endOfPartition = (nextOffset >= currentBufferEnd) && (len(messages) == 0 || len(messages) < maxMessages)
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+ }
+
+ // Buffer is empty but offset is in range - check previous buffers
+ logBuffer.RUnlock()
+
+ // Try previous buffers
+ logBuffer.RLock()
+ for _, prevBuf := range logBuffer.prevBuffers.buffers {
+ if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset {
+ if prevBuf.size > 0 {
+ // Found in previous buffer
+ bufCopy := make([]byte, prevBuf.size)
+ copy(bufCopy, prevBuf.buf[:prevBuf.size])
+ logBuffer.RUnlock()
+
+ messages, nextOffset, _, err = parseMessagesFromBuffer(
+ bufCopy, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.V(4).Infof("[StatelessRead] Read %d messages from previous buffer, nextOffset=%d",
+ len(messages), nextOffset)
+
+ endOfPartition = false // More data might be in current buffer
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+ }
+ // Empty previous buffer means data was flushed to disk - fall through to disk read
+ glog.V(2).Infof("[StatelessRead] Data at offset %d was flushed, attempting disk read", startOffset)
+ break
+ }
+ }
+ logBuffer.RUnlock()
+
+ // Data not in memory - attempt disk read if configured
+ // CRITICAL FIX: Don't return error here - data may be on disk!
+ // Fall through to disk read logic below
+ glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read",
+ startOffset, bufferStartOffset, currentBufferEnd)
+ // Don't return error - continue to disk read check below
+ } else {
+ // Offset is not in current buffer - check previous buffers FIRST before going to disk
+ // This handles the case where data was just flushed but is still in prevBuffers
+ glog.Infof("[StatelessRead] PATH: Offset %d not in current buffer [%d-%d), checking previous buffers first",
+ startOffset, bufferStartOffset, currentBufferEnd)
+
+ for _, prevBuf := range logBuffer.prevBuffers.buffers {
+ if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset {
+ if prevBuf.size > 0 {
+ // Found in previous buffer!
+ bufCopy := make([]byte, prevBuf.size)
+ copy(bufCopy, prevBuf.buf[:prevBuf.size])
+ logBuffer.RUnlock()
+
+ messages, nextOffset, _, err = parseMessagesFromBuffer(
+ bufCopy, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.Infof("[StatelessRead] SUCCESS: Found %d messages in previous buffer, nextOffset=%d",
+ len(messages), nextOffset)
+
+ endOfPartition = false // More data might exist
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+ }
+ // Empty previous buffer - data was flushed to disk
+ glog.V(2).Infof("[StatelessRead] Found empty previous buffer for offset %d, will try disk", startOffset)
+ break
+ }
+ }
+ logBuffer.RUnlock()
+ }
+
+ // If we get here, unlock if not already unlocked
+ // (Note: logBuffer.RUnlock() was called above in all paths)
+
+ // Data not in memory - try disk read
+ // This handles two cases:
+ // 1. startOffset < bufferStartOffset: Historical data
+ // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above)
+ if startOffset < currentBufferEnd {
+ glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ")
+
+ // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured
+ if startOffset < bufferStartOffset {
+ glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d",
+ startOffset, bufferStartOffset)
+ } else {
+ glog.Errorf("[StatelessRead] CASE 2: Flushed data - offset %d in range [%d, %d) but not in memory",
+ startOffset, bufferStartOffset, currentBufferEnd)
+ }
+
+ // Check if disk read function is configured
+ if logBuffer.ReadFromDiskFn == nil {
+ glog.Errorf("[StatelessRead] CRITICAL: ReadFromDiskFn is NIL! Cannot read from disk.")
+ if startOffset < bufferStartOffset {
+ return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d), and ReadFromDiskFn is nil",
+ startOffset, bufferStartOffset)
+ }
+ return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), and ReadFromDiskFn is nil",
+ startOffset, bufferStartOffset, currentBufferEnd)
+ }
+
+ glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...")
+
+ // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented)
+ // The ReadFromDiskFn should handle its own timeouts and not block indefinitely
+ diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk(
+ logBuffer, startOffset, maxMessages, maxBytes, highWaterMark)
+
+ if diskErr != nil {
+ glog.Errorf("[StatelessRead] CRITICAL: Disk read FAILED for offset %d: %v", startOffset, diskErr)
+ // IMPORTANT: Return retryable error instead of silently returning empty!
+ return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr)
+ }
+
+ if len(diskMessages) == 0 {
+ glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)",
+ startOffset, highWaterMark, bufferStartOffset)
+ } else {
+ glog.Infof("[StatelessRead] SUCCESS: Disk read returned %d messages, nextOffset=%d",
+ len(diskMessages), diskNextOffset)
+ }
+
+ // Return disk data
+ endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages
+ return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil
+ }
+
+ // startOffset >= currentBufferEnd - future offset, no data available yet
+ glog.V(4).Infof("[StatelessRead] Future offset %d >= buffer end %d, no data available",
+ startOffset, currentBufferEnd)
+ return messages, startOffset, highWaterMark, true, nil
+}
+
+// readHistoricalDataFromDisk reads messages from disk for historical offsets
+// This is called when the requested offset is older than what's in memory
+// Uses an in-memory cache to avoid repeated disk I/O for the same chunks
+func readHistoricalDataFromDisk(
+ logBuffer *LogBuffer,
+ startOffset int64,
+ maxMessages int,
+ maxBytes int,
+ highWaterMark int64,
+) (messages []*filer_pb.LogEntry, nextOffset int64, err error) {
+ const chunkSize = 1000 // Size of each cached chunk
+
+ glog.Infof("[DiskRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d, HWM=%d",
+ startOffset, maxMessages, maxBytes, highWaterMark)
+
+ // Calculate chunk start offset (aligned to chunkSize boundary)
+ chunkStartOffset := (startOffset / chunkSize) * chunkSize
+
+ glog.Infof("[DiskRead] Calculated chunkStartOffset=%d (aligned from %d)", chunkStartOffset, startOffset)
+
+ // Try to get from cache first
+ cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset)
+
+ if cacheHit {
+ // Found in cache - extract requested messages
+ glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d",
+ chunkStartOffset, startOffset, len(cachedMessages))
+
+ result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ // CRITICAL: Cache extraction failed because requested offset is BEYOND cached chunk
+ // This means disk files only contain partial data (e.g., 1000-1763) and the
+ // requested offset (e.g., 1764) is in a gap between disk and memory.
+ //
+ // SOLUTION: Return empty result with NO ERROR to let ReadMessagesAtOffset
+ // continue to check memory buffers. The data might be in memory even though
+ // it's not on disk.
+ glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)",
+ startOffset, chunkStartOffset, len(cachedMessages))
+ glog.Infof("[DiskCache] Returning empty to let memory buffers handle offset %d", startOffset)
+
+ // Return empty but NO ERROR - this signals "not on disk, try memory"
+ return nil, startOffset, nil
+ }
+
+ // Success - return cached data
+ return result, nextOff, nil
+ }
+
+ glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn",
+ chunkStartOffset)
+
+ // Not in cache - read entire chunk from disk for caching
+ chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize)
+ chunkNextOffset := chunkStartOffset
+
+ // Create a position for the chunk start
+ chunkPosition := MessagePosition{
+ IsOffsetBased: true,
+ Offset: chunkStartOffset,
+ }
+
+ // Define callback to collect the entire chunk
+ eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
+ // Read up to chunkSize messages for caching
+ if len(chunkMessages) >= chunkSize {
+ return true, nil
+ }
+
+ chunkMessages = append(chunkMessages, logEntry)
+ chunkNextOffset++
+
+ // Continue reading the chunk
+ return false, nil
+ }
+
+ // Read chunk from disk
+ glog.Infof("[DiskRead] Calling ReadFromDiskFn with position offset=%d...", chunkStartOffset)
+ _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn)
+
+ if readErr != nil {
+ glog.Errorf("[DiskRead] CRITICAL: ReadFromDiskFn returned ERROR: %v", readErr)
+ return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr)
+ }
+
+ glog.Infof("[DiskRead] ReadFromDiskFn completed successfully, read %d messages", len(chunkMessages))
+
+ // Cache the chunk for future reads
+ if len(chunkMessages) > 0 {
+ cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages)
+ glog.Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)",
+ chunkStartOffset, chunkNextOffset-1, len(chunkMessages))
+ } else {
+ glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset)
+ }
+
+ // Extract requested messages from the chunk
+ result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes)
+ glog.Infof("[DiskRead] EXIT: Returning %d messages, nextOffset=%d, err=%v", len(result), resNextOffset, resErr)
+ return result, resNextOffset, resErr
+}
+
+// getCachedDiskChunk retrieves a cached disk chunk if available
+func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) {
+ logBuffer.diskChunkCache.mu.RLock()
+ defer logBuffer.diskChunkCache.mu.RUnlock()
+
+ if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists {
+ // Update last access time
+ chunk.lastAccess = time.Now()
+ return chunk.messages, true
+ }
+
+ return nil, false
+}
+
+// invalidateCachedDiskChunk removes a chunk from the cache
+// This is called when cached data is found to be incomplete or incorrect
+func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) {
+ logBuffer.diskChunkCache.mu.Lock()
+ defer logBuffer.diskChunkCache.mu.Unlock()
+
+ if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists {
+ delete(logBuffer.diskChunkCache.chunks, chunkStartOffset)
+ glog.Infof("[DiskCache] Invalidated chunk at offset %d", chunkStartOffset)
+ }
+}
+
+// cacheDiskChunk stores a disk chunk in the cache with LRU eviction
+func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) {
+ logBuffer.diskChunkCache.mu.Lock()
+ defer logBuffer.diskChunkCache.mu.Unlock()
+
+ // Check if we need to evict old chunks (LRU policy)
+ if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks {
+ // Find least recently used chunk
+ var oldestOffset int64
+ var oldestTime time.Time
+ first := true
+
+ for offset, chunk := range logBuffer.diskChunkCache.chunks {
+ if first || chunk.lastAccess.Before(oldestTime) {
+ oldestOffset = offset
+ oldestTime = chunk.lastAccess
+ first = false
+ }
+ }
+
+ // Evict oldest chunk
+ delete(logBuffer.diskChunkCache.chunks, oldestOffset)
+ glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset)
+ }
+
+ // Store new chunk
+ logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{
+ startOffset: startOffset,
+ endOffset: endOffset,
+ messages: messages,
+ lastAccess: time.Now(),
+ }
+}
+
+// extractMessagesFromCache extracts requested messages from a cached chunk
+// chunkMessages contains messages starting from the chunk's aligned start offset
+// We need to skip to the requested startOffset within the chunk
+func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) {
+ const chunkSize = 1000
+ chunkStartOffset := (startOffset / chunkSize) * chunkSize
+
+ // Calculate position within chunk
+ positionInChunk := int(startOffset - chunkStartOffset)
+
+ // Check if requested offset is within the chunk
+ if positionInChunk < 0 {
+ glog.Errorf("[DiskCache] CRITICAL: Requested offset %d is BEFORE chunk start %d (positionInChunk=%d < 0)",
+ startOffset, chunkStartOffset, positionInChunk)
+ return nil, startOffset, fmt.Errorf("offset %d before chunk start %d", startOffset, chunkStartOffset)
+ }
+
+ if positionInChunk >= len(chunkMessages) {
+ // Requested offset is beyond the cached chunk
+ // This happens when disk files only contain partial data
+ // The requested offset might be in the gap between disk and memory
+ glog.Infof("[DiskCache] Requested offset %d is beyond cached chunk (chunkStart=%d, cachedSize=%d, positionInChunk=%d)",
+ startOffset, chunkStartOffset, len(chunkMessages), positionInChunk)
+ glog.Infof("[DiskCache] Chunk contains offsets %d-%d, requested %d - data not on disk",
+ chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset)
+
+ // Return empty (data not on disk) - caller will check memory buffers
+ return nil, startOffset, nil
+ }
+
+ // Extract messages starting from the requested position
+ messages := make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset := startOffset
+ totalBytes := 0
+
+ for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ {
+ entry := chunkMessages[i]
+ entrySize := proto.Size(entry)
+
+ // Check byte limit
+ if totalBytes > 0 && totalBytes+entrySize > maxBytes {
+ break
+ }
+
+ messages = append(messages, entry)
+ totalBytes += entrySize
+ nextOffset++
+ }
+
+ glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)",
+ len(messages), startOffset, nextOffset-1, totalBytes)
+
+ return messages, nextOffset, nil
+}
+
+// parseMessagesFromBuffer parses messages from a buffer byte slice
+// This is thread-safe as it operates on a copy of the buffer
+func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) (
+ messages []*filer_pb.LogEntry,
+ nextOffset int64,
+ totalBytes int,
+ err error,
+) {
+ messages = make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset = startOffset
+ totalBytes = 0
+ foundStart := false
+
+ messagesInBuffer := 0
+ for pos := 0; pos+4 < len(buf) && len(messages) < maxMessages && totalBytes < maxBytes; {
+ // Read message size
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ // Incomplete message at end of buffer
+ glog.V(4).Infof("[parseMessages] Incomplete message at pos %d, size %d, bufLen %d",
+ pos, size, len(buf))
+ break
+ }
+
+ // Parse message
+ entryData := buf[pos+4 : pos+4+int(size)]
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Warningf("[parseMessages] Failed to unmarshal message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+
+ messagesInBuffer++
+
+ // Initialize foundStart from first message
+ if !foundStart {
+ // Find the first message at or after startOffset
+ if logEntry.Offset >= startOffset {
+ glog.Infof("[parseMessages] Found first message at/after startOffset %d: logEntry.Offset=%d", startOffset, logEntry.Offset)
+ foundStart = true
+ nextOffset = logEntry.Offset
+ } else {
+ // Skip messages before startOffset
+ glog.V(3).Infof("[parseMessages] Skipping message at offset %d (before startOffset %d)", logEntry.Offset, startOffset)
+ pos += 4 + int(size)
+ continue
+ }
+ }
+
+ // Check if this message matches expected offset
+ if foundStart && logEntry.Offset >= startOffset {
+ glog.V(3).Infof("[parseMessages] Adding message at offset %d (count=%d)", logEntry.Offset, len(messages)+1)
+ messages = append(messages, logEntry)
+ totalBytes += 4 + int(size)
+ nextOffset = logEntry.Offset + 1
+ }
+
+ pos += 4 + int(size)
+ }
+
+ glog.Infof("[parseMessages] Parsed buffer: requested startOffset=%d, messagesInBuffer=%d, messagesReturned=%d, nextOffset=%d",
+ startOffset, messagesInBuffer, len(messages), nextOffset)
+
+ glog.V(4).Infof("[parseMessages] Parsed %d messages, nextOffset=%d, totalBytes=%d",
+ len(messages), nextOffset, totalBytes)
+
+ return messages, nextOffset, totalBytes, nil
+}
+
+// readMessagesFromDisk reads messages from disk using the ReadFromDiskFn
+func (logBuffer *LogBuffer) readMessagesFromDisk(startOffset int64, maxMessages int, maxBytes int, highWaterMark int64) (
+ messages []*filer_pb.LogEntry,
+ nextOffset int64,
+ highWaterMark2 int64,
+ endOfPartition bool,
+ err error,
+) {
+ if logBuffer.ReadFromDiskFn == nil {
+ return nil, startOffset, highWaterMark, true,
+ fmt.Errorf("no disk read function configured")
+ }
+
+ messages = make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset = startOffset
+ totalBytes := 0
+
+ // Use a simple callback to collect messages
+ collectFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ // Check limits
+ if len(messages) >= maxMessages {
+ return true, nil // Done
+ }
+
+ entrySize := 4 + len(logEntry.Data) + len(logEntry.Key)
+ if totalBytes+entrySize > maxBytes {
+ return true, nil // Done
+ }
+
+ // Only include messages at or after startOffset
+ if logEntry.Offset >= startOffset {
+ messages = append(messages, logEntry)
+ totalBytes += entrySize
+ nextOffset = logEntry.Offset + 1
+ }
+
+ return false, nil // Continue
+ }
+
+ // Read from disk
+ startPos := NewMessagePositionFromOffset(startOffset)
+ _, isDone, err := logBuffer.ReadFromDiskFn(startPos, 0, collectFn)
+
+ if err != nil {
+ glog.Warningf("[StatelessRead] Disk read error: %v", err)
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.V(4).Infof("[StatelessRead] Read %d messages from disk, nextOffset=%d, isDone=%v",
+ len(messages), nextOffset, isDone)
+
+ // If we read from disk and got no messages, and isDone is true, we're at the end
+ endOfPartition = isDone && len(messages) == 0
+
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+}
+
+// GetHighWaterMark returns the highest offset available in this partition
+// This is a lightweight operation for clients to check partition state
+func (logBuffer *LogBuffer) GetHighWaterMark() int64 {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+ return logBuffer.offset
+}
+
+// GetLogStartOffset returns the earliest offset available (either in memory or on disk)
+// This is useful for clients to know the valid offset range
+func (logBuffer *LogBuffer) GetLogStartOffset() int64 {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+
+ // Check if we have offset information
+ if !logBuffer.hasOffsets {
+ return 0
+ }
+
+ // Return the current buffer start offset - this is the earliest offset in memory RIGHT NOW
+ // For stateless fetch, we only return what's currently available in memory
+ // We don't check prevBuffers because they may be stale or getting flushed
+ return logBuffer.bufferStartOffset
+}
+
+// WaitForDataWithTimeout waits up to maxWaitMs for data to be available at startOffset
+// Returns true if data became available, false if timeout
+// This allows "long poll" behavior for real-time consumers
+func (logBuffer *LogBuffer) WaitForDataWithTimeout(startOffset int64, maxWaitMs int) bool {
+ if maxWaitMs <= 0 {
+ return false
+ }
+
+ timeout := time.NewTimer(time.Duration(maxWaitMs) * time.Millisecond)
+ defer timeout.Stop()
+
+ // Register for notifications
+ notifyChan := logBuffer.RegisterSubscriber(fmt.Sprintf("fetch-%d", startOffset))
+ defer logBuffer.UnregisterSubscriber(fmt.Sprintf("fetch-%d", startOffset))
+
+ // Check if data is already available
+ logBuffer.RLock()
+ currentEnd := logBuffer.offset
+ logBuffer.RUnlock()
+
+ if currentEnd >= startOffset {
+ return true
+ }
+
+ // Wait for notification or timeout
+ select {
+ case <-notifyChan:
+ // Data might be available now
+ logBuffer.RLock()
+ currentEnd := logBuffer.offset
+ logBuffer.RUnlock()
+ return currentEnd >= startOffset
+ case <-timeout.C:
+ return false
+ }
+}
diff --git a/weed/util/log_buffer/log_read_stateless_test.go b/weed/util/log_buffer/log_read_stateless_test.go
new file mode 100644
index 000000000..948a929ba
--- /dev/null
+++ b/weed/util/log_buffer/log_read_stateless_test.go
@@ -0,0 +1,372 @@
+package log_buffer
+
+import (
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestReadMessagesAtOffset_EmptyBuffer(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+ lb.bufferStartOffset = 0
+ lb.offset = 0 // Empty buffer
+
+ messages, nextOffset, hwm, endOfPartition, err := lb.ReadMessagesAtOffset(100, 10, 1024)
+
+ // Reading from future offset (100) when buffer is at 0
+ // Should return empty, no error
+ if err != nil {
+ t.Errorf("Expected no error for future offset, got %v", err)
+ }
+ if len(messages) != 0 {
+ t.Errorf("Expected 0 messages, got %d", len(messages))
+ }
+ if nextOffset != 100 {
+ t.Errorf("Expected nextOffset=100, got %d", nextOffset)
+ }
+ if !endOfPartition {
+ t.Error("Expected endOfPartition=true for future offset")
+ }
+ if hwm != 0 {
+ t.Errorf("Expected highWaterMark=0, got %d", hwm)
+ }
+}
+
+func TestReadMessagesAtOffset_SingleMessage(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Add a message
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key1"),
+ Data: []byte("value1"),
+ Offset: 0,
+ }
+ lb.AddLogEntryToBuffer(entry)
+
+ // Read from offset 0
+ messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(0, 10, 1024)
+
+ if err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+ if len(messages) != 1 {
+ t.Errorf("Expected 1 message, got %d", len(messages))
+ }
+ if nextOffset != 1 {
+ t.Errorf("Expected nextOffset=1, got %d", nextOffset)
+ }
+ if !endOfPartition {
+ t.Error("Expected endOfPartition=true after reading all messages")
+ }
+ if messages[0].Offset != 0 {
+ t.Errorf("Expected message offset=0, got %d", messages[0].Offset)
+ }
+ if string(messages[0].Key) != "key1" {
+ t.Errorf("Expected key='key1', got '%s'", string(messages[0].Key))
+ }
+}
+
+func TestReadMessagesAtOffset_MultipleMessages(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Add 5 messages
+ for i := 0; i < 5; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // Read from offset 0, max 3 messages
+ messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(0, 3, 10240)
+
+ if err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+ if len(messages) != 3 {
+ t.Errorf("Expected 3 messages, got %d", len(messages))
+ }
+ if nextOffset != 3 {
+ t.Errorf("Expected nextOffset=3, got %d", nextOffset)
+ }
+
+ // Verify offsets are sequential
+ for i, msg := range messages {
+ if msg.Offset != int64(i) {
+ t.Errorf("Message %d: expected offset=%d, got %d", i, i, msg.Offset)
+ }
+ }
+}
+
+func TestReadMessagesAtOffset_StartFromMiddle(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Add 10 messages (0-9)
+ for i := 0; i < 10; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // Read from offset 5
+ messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(5, 3, 10240)
+
+ if err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+ if len(messages) != 3 {
+ t.Errorf("Expected 3 messages, got %d", len(messages))
+ }
+ if nextOffset != 8 {
+ t.Errorf("Expected nextOffset=8, got %d", nextOffset)
+ }
+
+ // Verify we got messages 5, 6, 7
+ expectedOffsets := []int64{5, 6, 7}
+ for i, msg := range messages {
+ if msg.Offset != expectedOffsets[i] {
+ t.Errorf("Message %d: expected offset=%d, got %d", i, expectedOffsets[i], msg.Offset)
+ }
+ }
+}
+
+func TestReadMessagesAtOffset_MaxBytesLimit(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Add messages with 100 bytes each
+ for i := 0; i < 10; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: make([]byte, 100), // 100 bytes
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // Request with max 250 bytes (should get ~2 messages)
+ messages, _, _, _, err := lb.ReadMessagesAtOffset(0, 100, 250)
+
+ if err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+
+ // Should get at least 1 message, but likely 2
+ if len(messages) == 0 {
+ t.Error("Expected at least 1 message")
+ }
+ if len(messages) > 3 {
+ t.Errorf("Expected max 3 messages with 250 byte limit, got %d", len(messages))
+ }
+}
+
+func TestReadMessagesAtOffset_ConcurrentReads(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Add 100 messages
+ for i := 0; i < 100; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // Start 10 concurrent readers at different offsets
+ done := make(chan bool, 10)
+
+ for reader := 0; reader < 10; reader++ {
+ startOffset := int64(reader * 10)
+ go func(offset int64) {
+ messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(offset, 5, 10240)
+
+ if err != nil {
+ t.Errorf("Reader at offset %d: unexpected error: %v", offset, err)
+ }
+ if len(messages) != 5 {
+ t.Errorf("Reader at offset %d: expected 5 messages, got %d", offset, len(messages))
+ }
+ if nextOffset != offset+5 {
+ t.Errorf("Reader at offset %d: expected nextOffset=%d, got %d", offset, offset+5, nextOffset)
+ }
+
+ // Verify sequential offsets
+ for i, msg := range messages {
+ expectedOffset := offset + int64(i)
+ if msg.Offset != expectedOffset {
+ t.Errorf("Reader at offset %d: message %d has offset %d, expected %d",
+ offset, i, msg.Offset, expectedOffset)
+ }
+ }
+
+ done <- true
+ }(startOffset)
+ }
+
+ // Wait for all readers
+ for i := 0; i < 10; i++ {
+ <-done
+ }
+}
+
+func TestReadMessagesAtOffset_FutureOffset(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Add 5 messages (0-4)
+ for i := 0; i < 5; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // Try to read from offset 10 (future)
+ messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(10, 10, 10240)
+
+ if err != nil {
+ t.Errorf("Expected no error for future offset, got %v", err)
+ }
+ if len(messages) != 0 {
+ t.Errorf("Expected 0 messages for future offset, got %d", len(messages))
+ }
+ if nextOffset != 10 {
+ t.Errorf("Expected nextOffset=10, got %d", nextOffset)
+ }
+ if !endOfPartition {
+ t.Error("Expected endOfPartition=true for future offset")
+ }
+}
+
+func TestWaitForDataWithTimeout_DataAvailable(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Add message at offset 0
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: 0,
+ }
+ lb.AddLogEntryToBuffer(entry)
+
+ // Wait for data at offset 0 (should return immediately)
+ dataAvailable := lb.WaitForDataWithTimeout(0, 100)
+
+ if !dataAvailable {
+ t.Error("Expected data to be available at offset 0")
+ }
+}
+
+func TestWaitForDataWithTimeout_NoData(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+ lb.bufferStartOffset = 0
+ lb.offset = 0
+
+ // Don't add any messages, wait for offset 10
+
+ // Wait for data at offset 10 with short timeout
+ start := time.Now()
+ dataAvailable := lb.WaitForDataWithTimeout(10, 50)
+ elapsed := time.Since(start)
+
+ if dataAvailable {
+ t.Error("Expected no data to be available")
+ }
+ // Note: Actual wait time may be shorter if subscriber mechanism
+ // returns immediately. Just verify no data was returned.
+ t.Logf("Waited %v for timeout", elapsed)
+}
+
+func TestWaitForDataWithTimeout_DataArrives(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Start waiting in background
+ done := make(chan bool)
+ var dataAvailable bool
+
+ go func() {
+ dataAvailable = lb.WaitForDataWithTimeout(0, 500)
+ done <- true
+ }()
+
+ // Add data after 50ms
+ time.Sleep(50 * time.Millisecond)
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: 0,
+ }
+ lb.AddLogEntryToBuffer(entry)
+
+ // Wait for result
+ <-done
+
+ if !dataAvailable {
+ t.Error("Expected data to become available after being added")
+ }
+}
+
+func TestGetHighWaterMark(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+
+ // Initially should be 0
+ hwm := lb.GetHighWaterMark()
+ if hwm != 0 {
+ t.Errorf("Expected initial HWM=0, got %d", hwm)
+ }
+
+ // Add messages (offsets 0-4)
+ for i := 0; i < 5; i++ {
+ entry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: []byte("value"),
+ Offset: int64(i),
+ }
+ lb.AddLogEntryToBuffer(entry)
+ }
+
+ // HWM should be 5 (next offset to write, not last written offset)
+ // This matches Kafka semantics where HWM = last offset + 1
+ hwm = lb.GetHighWaterMark()
+ if hwm != 5 {
+ t.Errorf("Expected HWM=5 after adding 5 messages (0-4), got %d", hwm)
+ }
+}
+
+func TestGetLogStartOffset(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+ lb.hasOffsets = true
+ lb.bufferStartOffset = 10
+
+ lso := lb.GetLogStartOffset()
+ if lso != 10 {
+ t.Errorf("Expected LSO=10, got %d", lso)
+ }
+}