diff options
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 114 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_flush_gap_test.go | 680 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 1 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read_integration_test.go | 353 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read_stateless.go | 639 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read_stateless_test.go | 372 |
6 files changed, 2133 insertions, 26 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index aff8ec80b..692c47b44 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,8 +2,8 @@ package log_buffer import ( "bytes" + "fmt" "math" - "strings" "sync" "sync/atomic" "time" @@ -33,6 +33,21 @@ type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) +// DiskChunkCache caches chunks of historical data read from disk +type DiskChunkCache struct { + mu sync.RWMutex + chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize) + maxChunks int // Maximum number of chunks to cache +} + +// CachedDiskChunk represents a cached chunk of disk data +type CachedDiskChunk struct { + startOffset int64 + endOffset int64 + messages []*filer_pb.LogEntry + lastAccess time.Time +} + type LogBuffer struct { LastFlushTsNs int64 name string @@ -63,6 +78,8 @@ type LogBuffer struct { hasOffsets bool lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet) lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet) + // Disk chunk cache for historical data reads + diskChunkCache *DiskChunkCache sync.RWMutex } @@ -81,6 +98,10 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc flushChan: make(chan *dataToFlush, 256), isStopping: new(atomic.Bool), offset: 0, // Will be initialized from existing data if available + diskChunkCache: &DiskChunkCache{ + chunks: make(map[int64]*CachedDiskChunk), + maxChunks: 16, // Cache up to 16 chunks (configurable) + }, } lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet go lb.loopFlush() @@ -359,17 +380,52 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin if logBuffer.LastTsNs.Load() >= processingTsNs { processingTsNs = logBuffer.LastTsNs.Add(1) ts = time.Unix(0, processingTsNs) - // Re-marshal with corrected timestamp logEntry.TsNs = processingTsNs - logEntryData, _ = proto.Marshal(logEntry) } else { logBuffer.LastTsNs.Store(processingTsNs) } + // CRITICAL FIX: Set the offset in the LogEntry before marshaling + // This ensures the flushed data contains the correct offset information + // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads + logEntry.Offset = logBuffer.offset + + // DEBUG: Log data being added to buffer for GitHub Actions debugging + dataPreview := "" + if len(data) > 0 { + if len(data) <= 50 { + dataPreview = string(data) + } else { + dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data)) + } + } + glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q", + logBuffer.name, logBuffer.offset, len(data), dataPreview) + + // Marshal with correct timestamp and offset + logEntryData, _ = proto.Marshal(logEntry) + size := len(logEntryData) if logBuffer.pos == 0 { logBuffer.startTime = ts + // Reset offset tracking for new buffer + logBuffer.hasOffsets = false + } + + // Track offset ranges for Kafka integration + // CRITICAL FIX: Track the current offset being written + if !logBuffer.hasOffsets { + logBuffer.minOffset = logBuffer.offset + logBuffer.maxOffset = logBuffer.offset + logBuffer.hasOffsets = true + } else { + if logBuffer.offset < logBuffer.minOffset { + logBuffer.minOffset = logBuffer.offset + } + if logBuffer.offset > logBuffer.maxOffset { + logBuffer.maxOffset = logBuffer.offset + } } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { @@ -397,6 +453,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) logBuffer.pos += size + 4 + logBuffer.offset++ } func (logBuffer *LogBuffer) IsStopping() bool { @@ -540,11 +597,29 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush logBuffer.hasOffsets = false logBuffer.minOffset = 0 logBuffer.maxOffset = 0 + + // CRITICAL FIX: Invalidate disk cache chunks after flush + // The cache may contain stale data from before this flush + // Invalidating ensures consumers will re-read fresh data from disk after flush + logBuffer.invalidateAllDiskCacheChunks() + return d } return nil } +// invalidateAllDiskCacheChunks clears all cached disk chunks +// This should be called after a buffer flush to ensure consumers read fresh data from disk +func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + if len(logBuffer.diskChunkCache.chunks) > 0 { + glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks)) + logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk) + } +} + func (logBuffer *LogBuffer) GetEarliestTime() time.Time { return logBuffer.startTime } @@ -570,12 +645,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if isOffsetBased { requestedOffset := lastReadPosition.Offset - // DEBUG: Log buffer state for _schemas topic - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d", - requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load()) - } - // Check if the requested offset is in the current buffer range if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset { // If current buffer is empty (pos=0), check if data is on disk or not yet written @@ -593,10 +662,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // Case 3: try disk read (historical data might exist) if requestedOffset < logBuffer.offset { // Data was in the buffer range but buffer is now empty = flushed to disk - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)", - requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset) - } return nil, -2, ResumeFromDiskError } // requestedOffset == logBuffer.offset: Current position @@ -604,20 +669,11 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // (historical data might exist from previous runs) if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 { // Initial state: try disk read before waiting for new data - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0") - } return nil, -2, ResumeFromDiskError } // Otherwise, wait for new data to arrive - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset) - } return nil, logBuffer.offset, nil } - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos) - } return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } @@ -661,25 +717,31 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // if td < tm, case 2.3 // read from disk again var tsMemory time.Time - var tsBatchIndex int64 if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime - tsBatchIndex = logBuffer.offset } for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime - tsBatchIndex = prevBuf.offset } } if tsMemory.IsZero() { // case 2.2 return nil, -2, nil - } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3 + } else if lastReadPosition.Time.Before(tsMemory) { // case 2.3 + // CRITICAL FIX: For time-based reads, only check timestamp for disk reads + // Don't use offset comparisons as they're not meaningful for time-based subscriptions + // Special case: If requested time is zero (Unix epoch), treat as "start from beginning" // This handles queries that want to read all data without knowing the exact start time if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 { // Start from the beginning of memory // Fall through to case 2.1 to read from earliest buffer + } else if lastReadPosition.Offset == 0 && lastReadPosition.Time.Before(tsMemory) { + // CRITICAL FIX: If this is the first read (offset=0) and time is slightly before memory, + // it's likely a race between starting to read and first message being written + // Fall through to case 2.1 to read from earliest buffer instead of triggering disk read + glog.V(2).Infof("first read at time %v before earliest memory %v, reading from memory", + lastReadPosition.Time, tsMemory) } else { // Data not in memory buffers - read from disk glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v", diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go new file mode 100644 index 000000000..63d344b1a --- /dev/null +++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go @@ -0,0 +1,680 @@ +package log_buffer + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/protobuf/proto" +) + +// TestFlushOffsetGap_ReproduceDataLoss reproduces the critical bug where messages +// are lost in the gap between flushed disk data and in-memory buffer. +// +// OBSERVED BEHAVIOR FROM LOGS: +// Request offset: 1764 +// Disk contains: 1000-1763 (764 messages) +// Memory buffer starts at: 1800 +// Gap: 1764-1799 (36 messages) ← MISSING! +// +// This test verifies: +// 1. All messages sent to buffer are accounted for +// 2. No gaps exist between disk and memory offsets +// 3. Flushed data and in-memory data have continuous offset ranges +func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { + var flushedMessages []*filer_pb.LogEntry + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf)) + + // Parse and store flushed messages + flushMu.Lock() + defer flushMu.Unlock() + + // Parse buffer to extract messages + parsedCount := 0 + for pos := 0; pos+4 < len(buf); { + if pos+4 > len(buf) { + break + } + + size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err == nil { + flushedMessages = append(flushedMessages, logEntry) + parsedCount++ + } + + pos += 4 + int(size) + } + + t.Logf(" Parsed %d messages from flush buffer", parsedCount) + } + + logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Send 100 messages + messageCount := 100 + t.Logf("Sending %d messages...", messageCount) + + for i := 0; i < messageCount; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + }) + } + + // Force flush multiple times to simulate real workload + t.Logf("Forcing flush...") + logBuffer.ForceFlush() + + // Add more messages after flush + for i := messageCount; i < messageCount+50; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + }) + } + + // Force another flush + logBuffer.ForceFlush() + time.Sleep(200 * time.Millisecond) // Wait for flush to complete + + // Now check the buffer state + logBuffer.RLock() + bufferStartOffset := logBuffer.bufferStartOffset + currentOffset := logBuffer.offset + pos := logBuffer.pos + logBuffer.RUnlock() + + flushMu.Lock() + flushedCount := len(flushedMessages) + var maxFlushedOffset int64 = -1 + var minFlushedOffset int64 = -1 + if flushedCount > 0 { + minFlushedOffset = flushedMessages[0].Offset + maxFlushedOffset = flushedMessages[flushedCount-1].Offset + } + flushMu.Unlock() + + t.Logf("\nBUFFER STATE AFTER FLUSH:") + t.Logf(" bufferStartOffset: %d", bufferStartOffset) + t.Logf(" currentOffset (HWM): %d", currentOffset) + t.Logf(" pos (bytes in buffer): %d", pos) + t.Logf(" Messages sent: %d (offsets 0-%d)", messageCount+50, messageCount+49) + t.Logf(" Messages flushed to disk: %d (offsets %d-%d)", flushedCount, minFlushedOffset, maxFlushedOffset) + + // CRITICAL CHECK: Is there a gap between flushed data and memory buffer? + if flushedCount > 0 && maxFlushedOffset >= 0 { + gap := bufferStartOffset - (maxFlushedOffset + 1) + + t.Logf("\nOFFSET CONTINUITY CHECK:") + t.Logf(" Last flushed offset: %d", maxFlushedOffset) + t.Logf(" Buffer starts at: %d", bufferStartOffset) + t.Logf(" Gap: %d offsets", gap) + + if gap > 0 { + t.Errorf("❌ CRITICAL BUG REPRODUCED: OFFSET GAP DETECTED!") + t.Errorf(" Disk has offsets %d-%d", minFlushedOffset, maxFlushedOffset) + t.Errorf(" Memory buffer starts at: %d", bufferStartOffset) + t.Errorf(" MISSING OFFSETS: %d-%d (%d messages)", maxFlushedOffset+1, bufferStartOffset-1, gap) + t.Errorf(" These messages are LOST - neither on disk nor in memory!") + } else if gap < 0 { + t.Errorf("❌ OFFSET OVERLAP: Memory buffer starts BEFORE last flushed offset!") + t.Errorf(" This indicates data corruption or race condition") + } else { + t.Logf("✅ PASS: No gap detected - offsets are continuous") + } + + // Check if we can read all expected offsets + t.Logf("\nREADABILITY CHECK:") + for testOffset := int64(0); testOffset < currentOffset; testOffset += 10 { + // Try to read from buffer + requestPosition := NewMessagePositionFromOffset(testOffset) + buf, _, err := logBuffer.ReadFromBuffer(requestPosition) + + isReadable := (buf != nil && len(buf.Bytes()) > 0) || err == ResumeFromDiskError + status := "✅" + if !isReadable && err == nil { + status = "❌ NOT READABLE" + } + + t.Logf(" Offset %d: %s (buf=%v, err=%v)", testOffset, status, buf != nil, err) + + // If offset is in the gap, it should fail to read + if flushedCount > 0 && testOffset > maxFlushedOffset && testOffset < bufferStartOffset { + if isReadable { + t.Errorf(" Unexpected: Offset %d in gap range should NOT be readable!", testOffset) + } else { + t.Logf(" Expected: Offset %d in gap is not readable (data lost)", testOffset) + } + } + } + } + + // Check that all sent messages are accounted for + expectedMessageCount := messageCount + 50 + messagesInMemory := int(currentOffset - bufferStartOffset) + totalAccountedFor := flushedCount + messagesInMemory + + t.Logf("\nMESSAGE ACCOUNTING:") + t.Logf(" Expected: %d messages", expectedMessageCount) + t.Logf(" Flushed to disk: %d", flushedCount) + t.Logf(" In memory buffer: %d (offset range %d-%d)", messagesInMemory, bufferStartOffset, currentOffset-1) + t.Logf(" Total accounted for: %d", totalAccountedFor) + t.Logf(" Missing: %d messages", expectedMessageCount-totalAccountedFor) + + if totalAccountedFor < expectedMessageCount { + t.Errorf("❌ DATA LOSS CONFIRMED: %d messages are missing!", expectedMessageCount-totalAccountedFor) + } else { + t.Logf("✅ All messages accounted for") + } +} + +// TestFlushOffsetGap_CheckPrevBuffers tests if messages might be stuck in prevBuffers +// instead of being properly flushed to disk. +func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { + var flushCount int + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + flushMu.Lock() + flushCount++ + count := flushCount + flushMu.Unlock() + + t.Logf("FLUSH #%d: minOffset=%d maxOffset=%d size=%d bytes", count, minOffset, maxOffset, len(buf)) + } + + logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Send messages in batches with flushes in between + for batch := 0; batch < 5; batch++ { + t.Logf("\nBatch %d:", batch) + + // Send 20 messages + for i := 0; i < 20; i++ { + offset := int64(batch*20 + i) + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", offset)), + Value: []byte(fmt.Sprintf("message-%d", offset)), + TsNs: time.Now().UnixNano(), + }) + } + + // Check state before flush + logBuffer.RLock() + beforeFlushOffset := logBuffer.offset + beforeFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + // Force flush + logBuffer.ForceFlush() + time.Sleep(50 * time.Millisecond) + + // Check state after flush + logBuffer.RLock() + afterFlushOffset := logBuffer.offset + afterFlushStart := logBuffer.bufferStartOffset + prevBufferCount := len(logBuffer.prevBuffers.buffers) + + // Check prevBuffers state + t.Logf(" Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) + t.Logf(" After flush: offset=%d, bufferStartOffset=%d, prevBuffers=%d", + afterFlushOffset, afterFlushStart, prevBufferCount) + + // Check each prevBuffer + for i, prevBuf := range logBuffer.prevBuffers.buffers { + if prevBuf.size > 0 { + t.Logf(" prevBuffer[%d]: offsets %d-%d, size=%d bytes (NOT FLUSHED!)", + i, prevBuf.startOffset, prevBuf.offset, prevBuf.size) + } + } + logBuffer.RUnlock() + + // CRITICAL: Check if bufferStartOffset advanced correctly + expectedNewStart := beforeFlushOffset + if afterFlushStart != expectedNewStart { + t.Errorf(" ❌ bufferStartOffset mismatch!") + t.Errorf(" Expected: %d (= offset before flush)", expectedNewStart) + t.Errorf(" Actual: %d", afterFlushStart) + t.Errorf(" Gap: %d offsets", expectedNewStart-afterFlushStart) + } + } +} + +// TestFlushOffsetGap_ConcurrentWriteAndFlush tests for race conditions +// between writing new messages and flushing old ones. +func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { + var allFlushedOffsets []int64 + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) + + flushMu.Lock() + // Record the offset range that was flushed + for offset := minOffset; offset <= maxOffset; offset++ { + allFlushedOffsets = append(allFlushedOffsets, offset) + } + flushMu.Unlock() + } + + logBuffer := NewLogBuffer("test", 50*time.Millisecond, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Concurrently write messages and force flushes + var wg sync.WaitGroup + + // Writer goroutine + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 200; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + }) + if i%50 == 0 { + time.Sleep(10 * time.Millisecond) + } + } + }() + + // Flusher goroutine + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 5; i++ { + time.Sleep(30 * time.Millisecond) + logBuffer.ForceFlush() + } + }() + + wg.Wait() + time.Sleep(200 * time.Millisecond) // Wait for final flush + + // Check final state + logBuffer.RLock() + finalOffset := logBuffer.offset + finalBufferStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + flushMu.Lock() + flushedCount := len(allFlushedOffsets) + flushMu.Unlock() + + expectedCount := int(finalOffset) + inMemory := int(finalOffset - finalBufferStart) + totalAccountedFor := flushedCount + inMemory + + t.Logf("\nFINAL STATE:") + t.Logf(" Total messages sent: %d (offsets 0-%d)", expectedCount, expectedCount-1) + t.Logf(" Flushed to disk: %d", flushedCount) + t.Logf(" In memory: %d (offsets %d-%d)", inMemory, finalBufferStart, finalOffset-1) + t.Logf(" Total accounted: %d", totalAccountedFor) + t.Logf(" Missing: %d", expectedCount-totalAccountedFor) + + if totalAccountedFor < expectedCount { + t.Errorf("❌ DATA LOSS in concurrent scenario: %d messages missing!", expectedCount-totalAccountedFor) + } +} + +// TestFlushOffsetGap_ProductionScenario reproduces the actual production scenario +// where the broker uses AddLogEntryToBuffer with explicit Kafka offsets. +// This simulates leader publishing with offset assignment. +func TestFlushOffsetGap_ProductionScenario(t *testing.T) { + var flushedData []struct { + minOffset int64 + maxOffset int64 + messages []*filer_pb.LogEntry + } + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Parse messages from buffer + messages := []*filer_pb.LogEntry{} + for pos := 0; pos+4 < len(buf); { + size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) + if pos+4+int(size) > len(buf) { + break + } + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err == nil { + messages = append(messages, logEntry) + } + pos += 4 + int(size) + } + + flushMu.Lock() + flushedData = append(flushedData, struct { + minOffset int64 + maxOffset int64 + messages []*filer_pb.LogEntry + }{minOffset, maxOffset, messages}) + flushMu.Unlock() + + t.Logf("FLUSH: minOffset=%d maxOffset=%d, parsed %d messages", minOffset, maxOffset, len(messages)) + } + + logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Simulate broker behavior: assign Kafka offsets and add to buffer + // This is what PublishWithOffset() does + nextKafkaOffset := int64(0) + + // Round 1: Add 50 messages with Kafka offsets 0-49 + t.Logf("\n=== ROUND 1: Adding messages 0-49 ===") + for i := 0; i < 50; i++ { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Data: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + Offset: nextKafkaOffset, // Explicit Kafka offset + } + logBuffer.AddLogEntryToBuffer(logEntry) + nextKafkaOffset++ + } + + // Check buffer state before flush + logBuffer.RLock() + beforeFlushOffset := logBuffer.offset + beforeFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + t.Logf("Before flush: logBuffer.offset=%d, bufferStartOffset=%d, nextKafkaOffset=%d", + beforeFlushOffset, beforeFlushStart, nextKafkaOffset) + + // Flush + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Check buffer state after flush + logBuffer.RLock() + afterFlushOffset := logBuffer.offset + afterFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + t.Logf("After flush: logBuffer.offset=%d, bufferStartOffset=%d", + afterFlushOffset, afterFlushStart) + + // Round 2: Add another 50 messages with Kafka offsets 50-99 + t.Logf("\n=== ROUND 2: Adding messages 50-99 ===") + for i := 0; i < 50; i++ { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", 50+i)), + Data: []byte(fmt.Sprintf("message-%d", 50+i)), + TsNs: time.Now().UnixNano(), + Offset: nextKafkaOffset, + } + logBuffer.AddLogEntryToBuffer(logEntry) + nextKafkaOffset++ + } + + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Verification: Check if all Kafka offsets are accounted for + flushMu.Lock() + t.Logf("\n=== VERIFICATION ===") + t.Logf("Expected Kafka offsets: 0-%d", nextKafkaOffset-1) + + allOffsets := make(map[int64]bool) + for flushIdx, flush := range flushedData { + t.Logf("Flush #%d: minOffset=%d, maxOffset=%d, messages=%d", + flushIdx, flush.minOffset, flush.maxOffset, len(flush.messages)) + + for _, msg := range flush.messages { + if allOffsets[msg.Offset] { + t.Errorf(" ❌ DUPLICATE: Offset %d appears multiple times!", msg.Offset) + } + allOffsets[msg.Offset] = true + } + } + flushMu.Unlock() + + // Check for missing offsets + missingOffsets := []int64{} + for expectedOffset := int64(0); expectedOffset < nextKafkaOffset; expectedOffset++ { + if !allOffsets[expectedOffset] { + missingOffsets = append(missingOffsets, expectedOffset) + } + } + + if len(missingOffsets) > 0 { + t.Errorf("\n❌ MISSING OFFSETS DETECTED: %d offsets missing", len(missingOffsets)) + if len(missingOffsets) <= 20 { + t.Errorf("Missing: %v", missingOffsets) + } else { + t.Errorf("Missing: %v ... and %d more", missingOffsets[:20], len(missingOffsets)-20) + } + t.Errorf("\nThis reproduces the production bug!") + } else { + t.Logf("\n✅ SUCCESS: All %d Kafka offsets accounted for (0-%d)", nextKafkaOffset, nextKafkaOffset-1) + } + + // Check buffer offset consistency + logBuffer.RLock() + finalOffset := logBuffer.offset + finalBufferStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("\nFinal buffer state:") + t.Logf(" logBuffer.offset: %d", finalOffset) + t.Logf(" bufferStartOffset: %d", finalBufferStart) + t.Logf(" Expected (nextKafkaOffset): %d", nextKafkaOffset) + + if finalOffset != nextKafkaOffset { + t.Errorf("❌ logBuffer.offset mismatch: expected %d, got %d", nextKafkaOffset, finalOffset) + } +} + +// TestFlushOffsetGap_ConcurrentReadDuringFlush tests if concurrent reads +// during flush can cause messages to be missed. +func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { + var flushedOffsets []int64 + var flushMu sync.Mutex + + readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + // Simulate reading from disk - return flushed offsets + flushMu.Lock() + defer flushMu.Unlock() + + for _, offset := range flushedOffsets { + if offset >= startPosition.Offset { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", offset)), + Data: []byte(fmt.Sprintf("message-%d", offset)), + TsNs: time.Now().UnixNano(), + Offset: offset, + } + isDone, err := eachLogEntryFn(logEntry) + if err != nil || isDone { + return NewMessagePositionFromOffset(offset + 1), isDone, err + } + } + } + return startPosition, false, nil + } + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Parse and store flushed offsets + flushMu.Lock() + defer flushMu.Unlock() + + for pos := 0; pos+4 < len(buf); { + size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) + if pos+4+int(size) > len(buf) { + break + } + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err == nil { + flushedOffsets = append(flushedOffsets, logEntry.Offset) + } + pos += 4 + int(size) + } + + t.Logf("FLUSH: Stored %d offsets to disk (minOffset=%d, maxOffset=%d)", + len(flushedOffsets), minOffset, maxOffset) + } + + logBuffer := NewLogBuffer("test", time.Hour, flushFn, readFromDiskFn, nil) + defer logBuffer.ShutdownLogBuffer() + + // Add 100 messages + t.Logf("Adding 100 messages...") + for i := int64(0); i < 100; i++ { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Data: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + Offset: i, + } + logBuffer.AddLogEntryToBuffer(logEntry) + } + + // Flush (moves data to disk) + t.Logf("Flushing...") + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Now try to read all messages using ReadMessagesAtOffset + t.Logf("\nReading messages from offset 0...") + messages, nextOffset, hwm, endOfPartition, err := logBuffer.ReadMessagesAtOffset(0, 1000, 1024*1024) + + t.Logf("Read result: messages=%d, nextOffset=%d, hwm=%d, endOfPartition=%v, err=%v", + len(messages), nextOffset, hwm, endOfPartition, err) + + // Verify all offsets can be read + readOffsets := make(map[int64]bool) + for _, msg := range messages { + readOffsets[msg.Offset] = true + } + + missingOffsets := []int64{} + for expectedOffset := int64(0); expectedOffset < 100; expectedOffset++ { + if !readOffsets[expectedOffset] { + missingOffsets = append(missingOffsets, expectedOffset) + } + } + + if len(missingOffsets) > 0 { + t.Errorf("❌ MISSING OFFSETS after flush: %d offsets cannot be read", len(missingOffsets)) + if len(missingOffsets) <= 20 { + t.Errorf("Missing: %v", missingOffsets) + } else { + t.Errorf("Missing: %v ... and %d more", missingOffsets[:20], len(missingOffsets)-20) + } + } else { + t.Logf("✅ All 100 offsets can be read after flush") + } +} + +// TestFlushOffsetGap_ForceFlushAdvancesBuffer tests if ForceFlush +// properly advances bufferStartOffset after flushing. +func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { + flushedRanges := []struct{ min, max int64 }{} + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + flushMu.Lock() + flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) + flushMu.Unlock() + t.Logf("FLUSH: offsets %d-%d", minOffset, maxOffset) + } + + logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) // Long interval, manual flush only + defer logBuffer.ShutdownLogBuffer() + + // Send messages, flush, check state - repeat + for round := 0; round < 3; round++ { + t.Logf("\n=== ROUND %d ===", round) + + // Check state before adding messages + logBuffer.RLock() + beforeOffset := logBuffer.offset + beforeStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("Before adding: offset=%d, bufferStartOffset=%d", beforeOffset, beforeStart) + + // Add 10 messages + for i := 0; i < 10; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("round-%d-msg-%d", round, i)), + Value: []byte(fmt.Sprintf("data-%d-%d", round, i)), + TsNs: time.Now().UnixNano(), + }) + } + + // Check state after adding + logBuffer.RLock() + afterAddOffset := logBuffer.offset + afterAddStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("After adding: offset=%d, bufferStartOffset=%d", afterAddOffset, afterAddStart) + + // Force flush + t.Logf("Forcing flush...") + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Check state after flush + logBuffer.RLock() + afterFlushOffset := logBuffer.offset + afterFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) + + // CRITICAL CHECK: bufferStartOffset should advance to where offset was before flush + if afterFlushStart != afterAddOffset { + t.Errorf("❌ FLUSH BUG: bufferStartOffset did NOT advance correctly!") + t.Errorf(" Expected bufferStartOffset=%d (= offset after add)", afterAddOffset) + t.Errorf(" Actual bufferStartOffset=%d", afterFlushStart) + t.Errorf(" Gap: %d offsets WILL BE LOST", afterAddOffset-afterFlushStart) + } else { + t.Logf("✅ bufferStartOffset correctly advanced to %d", afterFlushStart) + } + } + + // Final verification: check all offset ranges are continuous + flushMu.Lock() + t.Logf("\n=== FLUSHED RANGES ===") + for i, r := range flushedRanges { + t.Logf("Flush #%d: offsets %d-%d", i, r.min, r.max) + + // Check continuity with previous flush + if i > 0 { + prevMax := flushedRanges[i-1].max + currentMin := r.min + gap := currentMin - (prevMax + 1) + + if gap > 0 { + t.Errorf("❌ GAP between flush #%d and #%d: %d offsets missing!", i-1, i, gap) + } else if gap < 0 { + t.Errorf("❌ OVERLAP between flush #%d and #%d: %d offsets duplicated!", i-1, i, -gap) + } else { + t.Logf(" ✅ Continuous with previous flush") + } + } + } + flushMu.Unlock() +} + diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 77f03ddb8..3b7b99ada 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -355,6 +355,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star continue } + glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key)) // Handle offset-based filtering for offset-based start positions diff --git a/weed/util/log_buffer/log_read_integration_test.go b/weed/util/log_buffer/log_read_integration_test.go new file mode 100644 index 000000000..38549b9f7 --- /dev/null +++ b/weed/util/log_buffer/log_read_integration_test.go @@ -0,0 +1,353 @@ +package log_buffer + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// TestConcurrentProducerConsumer simulates the integration test scenario: +// - One producer writing messages continuously +// - Multiple consumers reading from different offsets +// - Consumers reading sequentially (like Kafka consumers) +func TestConcurrentProducerConsumer(t *testing.T) { + lb := NewLogBuffer("integration-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 1000 + const numConsumers = 2 + const messagesPerConsumer = numMessages / numConsumers + + // Start producer + producerDone := make(chan bool) + go func() { + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + time.Sleep(1 * time.Millisecond) // Simulate production rate + } + producerDone <- true + }() + + // Start consumers + consumerWg := sync.WaitGroup{} + consumerErrors := make(chan error, numConsumers) + consumedCounts := make([]int64, numConsumers) + + for consumerID := 0; consumerID < numConsumers; consumerID++ { + consumerWg.Add(1) + go func(id int, startOffset int64, endOffset int64) { + defer consumerWg.Done() + + currentOffset := startOffset + for currentOffset < endOffset { + // Read 10 messages at a time (like integration test) + messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) + if err != nil { + consumerErrors <- err + return + } + + if len(messages) == 0 { + // No data yet, wait a bit + time.Sleep(5 * time.Millisecond) + continue + } + + // Count only messages in this consumer's assigned range + messagesInRange := 0 + for i, msg := range messages { + if msg.Offset >= startOffset && msg.Offset < endOffset { + messagesInRange++ + expectedOffset := currentOffset + int64(i) + if msg.Offset != expectedOffset { + t.Errorf("Consumer %d: Expected offset %d, got %d", id, expectedOffset, msg.Offset) + } + } + } + + atomic.AddInt64(&consumedCounts[id], int64(messagesInRange)) + currentOffset = nextOffset + } + }(consumerID, int64(consumerID*messagesPerConsumer), int64((consumerID+1)*messagesPerConsumer)) + } + + // Wait for producer to finish + <-producerDone + + // Wait for consumers (with timeout) + done := make(chan bool) + go func() { + consumerWg.Wait() + done <- true + }() + + select { + case <-done: + // Success + case err := <-consumerErrors: + t.Fatalf("Consumer error: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for consumers to finish") + } + + // Verify all messages were consumed + totalConsumed := int64(0) + for i, count := range consumedCounts { + t.Logf("Consumer %d consumed %d messages", i, count) + totalConsumed += count + } + + if totalConsumed != numMessages { + t.Errorf("Expected to consume %d messages, but consumed %d", numMessages, totalConsumed) + } +} + +// TestBackwardSeeksWhileProducing simulates consumer rebalancing where +// consumers seek backward to earlier offsets while producer is still writing +func TestBackwardSeeksWhileProducing(t *testing.T) { + lb := NewLogBuffer("backward-seek-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 500 + const numSeeks = 10 + + // Start producer + producerDone := make(chan bool) + go func() { + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + time.Sleep(1 * time.Millisecond) + } + producerDone <- true + }() + + // Consumer that seeks backward periodically + consumerDone := make(chan bool) + readOffsets := make(map[int64]int) // Track how many times each offset was read + + go func() { + currentOffset := int64(0) + seeksRemaining := numSeeks + + for currentOffset < numMessages { + // Read some messages + messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) + if err != nil { + // For stateless reads, "offset out of range" means data not in memory yet + // This is expected when reading historical data or before production starts + time.Sleep(5 * time.Millisecond) + continue + } + + if len(messages) == 0 { + // No data available yet or caught up to producer + if !endOfPartition { + // Data might be coming, wait + time.Sleep(5 * time.Millisecond) + } else { + // At end of partition, wait for more production + time.Sleep(5 * time.Millisecond) + } + continue + } + + // Track read offsets + for _, msg := range messages { + readOffsets[msg.Offset]++ + } + + // Periodically seek backward (simulating rebalancing) + if seeksRemaining > 0 && nextOffset > 50 && nextOffset%100 == 0 { + seekOffset := nextOffset - 20 + t.Logf("Seeking backward from %d to %d", nextOffset, seekOffset) + currentOffset = seekOffset + seeksRemaining-- + } else { + currentOffset = nextOffset + } + } + + consumerDone <- true + }() + + // Wait for both + <-producerDone + <-consumerDone + + // Verify each offset was read at least once + for i := int64(0); i < numMessages; i++ { + if readOffsets[i] == 0 { + t.Errorf("Offset %d was never read", i) + } + } + + t.Logf("Total unique offsets read: %d out of %d", len(readOffsets), numMessages) +} + +// TestHighConcurrencyReads simulates multiple consumers reading from +// different offsets simultaneously (stress test) +func TestHighConcurrencyReads(t *testing.T) { + lb := NewLogBuffer("high-concurrency-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 1000 + const numReaders = 10 + + // Pre-populate buffer + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Start many concurrent readers at different offsets + wg := sync.WaitGroup{} + errors := make(chan error, numReaders) + + for reader := 0; reader < numReaders; reader++ { + wg.Add(1) + go func(startOffset int64) { + defer wg.Done() + + // Read 100 messages from this offset + currentOffset := startOffset + readCount := 0 + + for readCount < 100 && currentOffset < numMessages { + messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) + if err != nil { + errors <- err + return + } + + // Verify offsets are sequential + for i, msg := range messages { + expected := currentOffset + int64(i) + if msg.Offset != expected { + t.Errorf("Reader at %d: expected offset %d, got %d", startOffset, expected, msg.Offset) + } + } + + readCount += len(messages) + currentOffset = nextOffset + } + }(int64(reader * 10)) + } + + // Wait with timeout + done := make(chan bool) + go func() { + wg.Wait() + done <- true + }() + + select { + case <-done: + // Success + case err := <-errors: + t.Fatalf("Reader error: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for readers") + } +} + +// TestRepeatedReadsAtSameOffset simulates what happens when Kafka +// consumer re-fetches the same offset multiple times (due to timeouts or retries) +func TestRepeatedReadsAtSameOffset(t *testing.T) { + lb := NewLogBuffer("repeated-reads-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 100 + + // Pre-populate buffer + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Read the same offset multiple times concurrently + const numReads = 10 + const testOffset = int64(50) + + wg := sync.WaitGroup{} + results := make([][]*filer_pb.LogEntry, numReads) + + for i := 0; i < numReads; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + messages, _, _, _, err := lb.ReadMessagesAtOffset(testOffset, 10, 10240) + if err != nil { + t.Errorf("Read %d error: %v", idx, err) + return + } + results[idx] = messages + }(i) + } + + wg.Wait() + + // Verify all reads returned the same data + firstRead := results[0] + for i := 1; i < numReads; i++ { + if len(results[i]) != len(firstRead) { + t.Errorf("Read %d returned %d messages, expected %d", i, len(results[i]), len(firstRead)) + } + + for j := range results[i] { + if results[i][j].Offset != firstRead[j].Offset { + t.Errorf("Read %d message %d has offset %d, expected %d", + i, j, results[i][j].Offset, firstRead[j].Offset) + } + } + } +} + +// TestEmptyPartitionPolling simulates consumers polling empty partitions +// waiting for data (common in Kafka) +func TestEmptyPartitionPolling(t *testing.T) { + lb := NewLogBuffer("empty-partition-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + lb.bufferStartOffset = 0 + lb.offset = 0 + + // Try to read from empty partition + messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(0, 10, 10240) + + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if len(messages) != 0 { + t.Errorf("Expected 0 messages, got %d", len(messages)) + } + if nextOffset != 0 { + t.Errorf("Expected nextOffset=0, got %d", nextOffset) + } + if !endOfPartition { + t.Error("Expected endOfPartition=true for future offset") + } +} diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go new file mode 100644 index 000000000..b57f7742f --- /dev/null +++ b/weed/util/log_buffer/log_read_stateless.go @@ -0,0 +1,639 @@ +package log_buffer + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/protobuf/proto" +) + +// ReadMessagesAtOffset provides Kafka-style stateless reads from LogBuffer +// Each call is completely independent - no state maintained between calls +// Thread-safe for concurrent reads at different offsets +// +// This is the recommended API for stateless clients like Kafka gateway +// Unlike Subscribe loops, this: +// 1. Returns immediately with available data (or empty if none) +// 2. Does not maintain any session state +// 3. Safe for concurrent calls +// 4. No cancellation/restart complexity +// +// Returns: +// - messages: Array of messages starting at startOffset +// - nextOffset: Offset to use for next fetch +// - highWaterMark: Highest offset available in partition +// - endOfPartition: True if no more data available +// - err: Any error encountered +func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages int, maxBytes int) ( + messages []*filer_pb.LogEntry, + nextOffset int64, + highWaterMark int64, + endOfPartition bool, + err error, +) { + glog.Infof("[StatelessRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d", + startOffset, maxMessages, maxBytes) + + // Quick validation + if maxMessages <= 0 { + maxMessages = 100 // Default reasonable batch size + } + if maxBytes <= 0 { + maxBytes = 4 * 1024 * 1024 // 4MB default + } + + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + + // Try to read from in-memory buffers first (hot path) + logBuffer.RLock() + currentBufferEnd := logBuffer.offset + bufferStartOffset := logBuffer.bufferStartOffset + highWaterMark = currentBufferEnd + + glog.Infof("[StatelessRead] Buffer state: startOffset=%d, bufferStart=%d, bufferEnd=%d, HWM=%d, pos=%d", + startOffset, bufferStartOffset, currentBufferEnd, highWaterMark, logBuffer.pos) + + // Special case: empty buffer (no data written yet) + if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 { + logBuffer.RUnlock() + glog.Infof("[StatelessRead] PATH: Empty buffer (no data written yet)") + // Return empty result - partition exists but has no data yet + // Preserve the requested offset in nextOffset + return messages, startOffset, 0, true, nil + } + + // Check if requested offset is in current buffer + if startOffset >= bufferStartOffset && startOffset < currentBufferEnd { + glog.Infof("[StatelessRead] PATH: Attempting to read from current/previous memory buffers") + // Read from current buffer + glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d", + bufferStartOffset, currentBufferEnd) + + if logBuffer.pos > 0 { + // Make a copy of the buffer to avoid concurrent modification + bufCopy := make([]byte, logBuffer.pos) + copy(bufCopy, logBuffer.buf[:logBuffer.pos]) + logBuffer.RUnlock() // Release lock early + + // Parse messages from buffer copy + messages, nextOffset, _, err = parseMessagesFromBuffer( + bufCopy, startOffset, maxMessages, maxBytes) + + if err != nil { + return nil, startOffset, highWaterMark, false, err + } + + glog.V(4).Infof("[StatelessRead] Read %d messages from current buffer, nextOffset=%d", + len(messages), nextOffset) + + // Check if we reached the end + endOfPartition = (nextOffset >= currentBufferEnd) && (len(messages) == 0 || len(messages) < maxMessages) + return messages, nextOffset, highWaterMark, endOfPartition, nil + } + + // Buffer is empty but offset is in range - check previous buffers + logBuffer.RUnlock() + + // Try previous buffers + logBuffer.RLock() + for _, prevBuf := range logBuffer.prevBuffers.buffers { + if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset { + if prevBuf.size > 0 { + // Found in previous buffer + bufCopy := make([]byte, prevBuf.size) + copy(bufCopy, prevBuf.buf[:prevBuf.size]) + logBuffer.RUnlock() + + messages, nextOffset, _, err = parseMessagesFromBuffer( + bufCopy, startOffset, maxMessages, maxBytes) + + if err != nil { + return nil, startOffset, highWaterMark, false, err + } + + glog.V(4).Infof("[StatelessRead] Read %d messages from previous buffer, nextOffset=%d", + len(messages), nextOffset) + + endOfPartition = false // More data might be in current buffer + return messages, nextOffset, highWaterMark, endOfPartition, nil + } + // Empty previous buffer means data was flushed to disk - fall through to disk read + glog.V(2).Infof("[StatelessRead] Data at offset %d was flushed, attempting disk read", startOffset) + break + } + } + logBuffer.RUnlock() + + // Data not in memory - attempt disk read if configured + // CRITICAL FIX: Don't return error here - data may be on disk! + // Fall through to disk read logic below + glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read", + startOffset, bufferStartOffset, currentBufferEnd) + // Don't return error - continue to disk read check below + } else { + // Offset is not in current buffer - check previous buffers FIRST before going to disk + // This handles the case where data was just flushed but is still in prevBuffers + glog.Infof("[StatelessRead] PATH: Offset %d not in current buffer [%d-%d), checking previous buffers first", + startOffset, bufferStartOffset, currentBufferEnd) + + for _, prevBuf := range logBuffer.prevBuffers.buffers { + if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset { + if prevBuf.size > 0 { + // Found in previous buffer! + bufCopy := make([]byte, prevBuf.size) + copy(bufCopy, prevBuf.buf[:prevBuf.size]) + logBuffer.RUnlock() + + messages, nextOffset, _, err = parseMessagesFromBuffer( + bufCopy, startOffset, maxMessages, maxBytes) + + if err != nil { + return nil, startOffset, highWaterMark, false, err + } + + glog.Infof("[StatelessRead] SUCCESS: Found %d messages in previous buffer, nextOffset=%d", + len(messages), nextOffset) + + endOfPartition = false // More data might exist + return messages, nextOffset, highWaterMark, endOfPartition, nil + } + // Empty previous buffer - data was flushed to disk + glog.V(2).Infof("[StatelessRead] Found empty previous buffer for offset %d, will try disk", startOffset) + break + } + } + logBuffer.RUnlock() + } + + // If we get here, unlock if not already unlocked + // (Note: logBuffer.RUnlock() was called above in all paths) + + // Data not in memory - try disk read + // This handles two cases: + // 1. startOffset < bufferStartOffset: Historical data + // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above) + if startOffset < currentBufferEnd { + glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ") + + // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured + if startOffset < bufferStartOffset { + glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d", + startOffset, bufferStartOffset) + } else { + glog.Errorf("[StatelessRead] CASE 2: Flushed data - offset %d in range [%d, %d) but not in memory", + startOffset, bufferStartOffset, currentBufferEnd) + } + + // Check if disk read function is configured + if logBuffer.ReadFromDiskFn == nil { + glog.Errorf("[StatelessRead] CRITICAL: ReadFromDiskFn is NIL! Cannot read from disk.") + if startOffset < bufferStartOffset { + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d), and ReadFromDiskFn is nil", + startOffset, bufferStartOffset) + } + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), and ReadFromDiskFn is nil", + startOffset, bufferStartOffset, currentBufferEnd) + } + + glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...") + + // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) + // The ReadFromDiskFn should handle its own timeouts and not block indefinitely + diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( + logBuffer, startOffset, maxMessages, maxBytes, highWaterMark) + + if diskErr != nil { + glog.Errorf("[StatelessRead] CRITICAL: Disk read FAILED for offset %d: %v", startOffset, diskErr) + // IMPORTANT: Return retryable error instead of silently returning empty! + return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr) + } + + if len(diskMessages) == 0 { + glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)", + startOffset, highWaterMark, bufferStartOffset) + } else { + glog.Infof("[StatelessRead] SUCCESS: Disk read returned %d messages, nextOffset=%d", + len(diskMessages), diskNextOffset) + } + + // Return disk data + endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages + return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil + } + + // startOffset >= currentBufferEnd - future offset, no data available yet + glog.V(4).Infof("[StatelessRead] Future offset %d >= buffer end %d, no data available", + startOffset, currentBufferEnd) + return messages, startOffset, highWaterMark, true, nil +} + +// readHistoricalDataFromDisk reads messages from disk for historical offsets +// This is called when the requested offset is older than what's in memory +// Uses an in-memory cache to avoid repeated disk I/O for the same chunks +func readHistoricalDataFromDisk( + logBuffer *LogBuffer, + startOffset int64, + maxMessages int, + maxBytes int, + highWaterMark int64, +) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { + const chunkSize = 1000 // Size of each cached chunk + + glog.Infof("[DiskRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d, HWM=%d", + startOffset, maxMessages, maxBytes, highWaterMark) + + // Calculate chunk start offset (aligned to chunkSize boundary) + chunkStartOffset := (startOffset / chunkSize) * chunkSize + + glog.Infof("[DiskRead] Calculated chunkStartOffset=%d (aligned from %d)", chunkStartOffset, startOffset) + + // Try to get from cache first + cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset) + + if cacheHit { + // Found in cache - extract requested messages + glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d", + chunkStartOffset, startOffset, len(cachedMessages)) + + result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) + + if err != nil { + // CRITICAL: Cache extraction failed because requested offset is BEYOND cached chunk + // This means disk files only contain partial data (e.g., 1000-1763) and the + // requested offset (e.g., 1764) is in a gap between disk and memory. + // + // SOLUTION: Return empty result with NO ERROR to let ReadMessagesAtOffset + // continue to check memory buffers. The data might be in memory even though + // it's not on disk. + glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)", + startOffset, chunkStartOffset, len(cachedMessages)) + glog.Infof("[DiskCache] Returning empty to let memory buffers handle offset %d", startOffset) + + // Return empty but NO ERROR - this signals "not on disk, try memory" + return nil, startOffset, nil + } + + // Success - return cached data + return result, nextOff, nil + } + + glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn", + chunkStartOffset) + + // Not in cache - read entire chunk from disk for caching + chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize) + chunkNextOffset := chunkStartOffset + + // Create a position for the chunk start + chunkPosition := MessagePosition{ + IsOffsetBased: true, + Offset: chunkStartOffset, + } + + // Define callback to collect the entire chunk + eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Read up to chunkSize messages for caching + if len(chunkMessages) >= chunkSize { + return true, nil + } + + chunkMessages = append(chunkMessages, logEntry) + chunkNextOffset++ + + // Continue reading the chunk + return false, nil + } + + // Read chunk from disk + glog.Infof("[DiskRead] Calling ReadFromDiskFn with position offset=%d...", chunkStartOffset) + _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn) + + if readErr != nil { + glog.Errorf("[DiskRead] CRITICAL: ReadFromDiskFn returned ERROR: %v", readErr) + return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) + } + + glog.Infof("[DiskRead] ReadFromDiskFn completed successfully, read %d messages", len(chunkMessages)) + + // Cache the chunk for future reads + if len(chunkMessages) > 0 { + cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages) + glog.Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)", + chunkStartOffset, chunkNextOffset-1, len(chunkMessages)) + } else { + glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset) + } + + // Extract requested messages from the chunk + result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes) + glog.Infof("[DiskRead] EXIT: Returning %d messages, nextOffset=%d, err=%v", len(result), resNextOffset, resErr) + return result, resNextOffset, resErr +} + +// getCachedDiskChunk retrieves a cached disk chunk if available +func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) { + logBuffer.diskChunkCache.mu.RLock() + defer logBuffer.diskChunkCache.mu.RUnlock() + + if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { + // Update last access time + chunk.lastAccess = time.Now() + return chunk.messages, true + } + + return nil, false +} + +// invalidateCachedDiskChunk removes a chunk from the cache +// This is called when cached data is found to be incomplete or incorrect +func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { + delete(logBuffer.diskChunkCache.chunks, chunkStartOffset) + glog.Infof("[DiskCache] Invalidated chunk at offset %d", chunkStartOffset) + } +} + +// cacheDiskChunk stores a disk chunk in the cache with LRU eviction +func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + // Check if we need to evict old chunks (LRU policy) + if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks { + // Find least recently used chunk + var oldestOffset int64 + var oldestTime time.Time + first := true + + for offset, chunk := range logBuffer.diskChunkCache.chunks { + if first || chunk.lastAccess.Before(oldestTime) { + oldestOffset = offset + oldestTime = chunk.lastAccess + first = false + } + } + + // Evict oldest chunk + delete(logBuffer.diskChunkCache.chunks, oldestOffset) + glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset) + } + + // Store new chunk + logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{ + startOffset: startOffset, + endOffset: endOffset, + messages: messages, + lastAccess: time.Now(), + } +} + +// extractMessagesFromCache extracts requested messages from a cached chunk +// chunkMessages contains messages starting from the chunk's aligned start offset +// We need to skip to the requested startOffset within the chunk +func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) { + const chunkSize = 1000 + chunkStartOffset := (startOffset / chunkSize) * chunkSize + + // Calculate position within chunk + positionInChunk := int(startOffset - chunkStartOffset) + + // Check if requested offset is within the chunk + if positionInChunk < 0 { + glog.Errorf("[DiskCache] CRITICAL: Requested offset %d is BEFORE chunk start %d (positionInChunk=%d < 0)", + startOffset, chunkStartOffset, positionInChunk) + return nil, startOffset, fmt.Errorf("offset %d before chunk start %d", startOffset, chunkStartOffset) + } + + if positionInChunk >= len(chunkMessages) { + // Requested offset is beyond the cached chunk + // This happens when disk files only contain partial data + // The requested offset might be in the gap between disk and memory + glog.Infof("[DiskCache] Requested offset %d is beyond cached chunk (chunkStart=%d, cachedSize=%d, positionInChunk=%d)", + startOffset, chunkStartOffset, len(chunkMessages), positionInChunk) + glog.Infof("[DiskCache] Chunk contains offsets %d-%d, requested %d - data not on disk", + chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset) + + // Return empty (data not on disk) - caller will check memory buffers + return nil, startOffset, nil + } + + // Extract messages starting from the requested position + messages := make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset := startOffset + totalBytes := 0 + + for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ { + entry := chunkMessages[i] + entrySize := proto.Size(entry) + + // Check byte limit + if totalBytes > 0 && totalBytes+entrySize > maxBytes { + break + } + + messages = append(messages, entry) + totalBytes += entrySize + nextOffset++ + } + + glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)", + len(messages), startOffset, nextOffset-1, totalBytes) + + return messages, nextOffset, nil +} + +// parseMessagesFromBuffer parses messages from a buffer byte slice +// This is thread-safe as it operates on a copy of the buffer +func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) ( + messages []*filer_pb.LogEntry, + nextOffset int64, + totalBytes int, + err error, +) { + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + totalBytes = 0 + foundStart := false + + messagesInBuffer := 0 + for pos := 0; pos+4 < len(buf) && len(messages) < maxMessages && totalBytes < maxBytes; { + // Read message size + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + // Incomplete message at end of buffer + glog.V(4).Infof("[parseMessages] Incomplete message at pos %d, size %d, bufLen %d", + pos, size, len(buf)) + break + } + + // Parse message + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Warningf("[parseMessages] Failed to unmarshal message: %v", err) + pos += 4 + int(size) + continue + } + + messagesInBuffer++ + + // Initialize foundStart from first message + if !foundStart { + // Find the first message at or after startOffset + if logEntry.Offset >= startOffset { + glog.Infof("[parseMessages] Found first message at/after startOffset %d: logEntry.Offset=%d", startOffset, logEntry.Offset) + foundStart = true + nextOffset = logEntry.Offset + } else { + // Skip messages before startOffset + glog.V(3).Infof("[parseMessages] Skipping message at offset %d (before startOffset %d)", logEntry.Offset, startOffset) + pos += 4 + int(size) + continue + } + } + + // Check if this message matches expected offset + if foundStart && logEntry.Offset >= startOffset { + glog.V(3).Infof("[parseMessages] Adding message at offset %d (count=%d)", logEntry.Offset, len(messages)+1) + messages = append(messages, logEntry) + totalBytes += 4 + int(size) + nextOffset = logEntry.Offset + 1 + } + + pos += 4 + int(size) + } + + glog.Infof("[parseMessages] Parsed buffer: requested startOffset=%d, messagesInBuffer=%d, messagesReturned=%d, nextOffset=%d", + startOffset, messagesInBuffer, len(messages), nextOffset) + + glog.V(4).Infof("[parseMessages] Parsed %d messages, nextOffset=%d, totalBytes=%d", + len(messages), nextOffset, totalBytes) + + return messages, nextOffset, totalBytes, nil +} + +// readMessagesFromDisk reads messages from disk using the ReadFromDiskFn +func (logBuffer *LogBuffer) readMessagesFromDisk(startOffset int64, maxMessages int, maxBytes int, highWaterMark int64) ( + messages []*filer_pb.LogEntry, + nextOffset int64, + highWaterMark2 int64, + endOfPartition bool, + err error, +) { + if logBuffer.ReadFromDiskFn == nil { + return nil, startOffset, highWaterMark, true, + fmt.Errorf("no disk read function configured") + } + + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + totalBytes := 0 + + // Use a simple callback to collect messages + collectFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + // Check limits + if len(messages) >= maxMessages { + return true, nil // Done + } + + entrySize := 4 + len(logEntry.Data) + len(logEntry.Key) + if totalBytes+entrySize > maxBytes { + return true, nil // Done + } + + // Only include messages at or after startOffset + if logEntry.Offset >= startOffset { + messages = append(messages, logEntry) + totalBytes += entrySize + nextOffset = logEntry.Offset + 1 + } + + return false, nil // Continue + } + + // Read from disk + startPos := NewMessagePositionFromOffset(startOffset) + _, isDone, err := logBuffer.ReadFromDiskFn(startPos, 0, collectFn) + + if err != nil { + glog.Warningf("[StatelessRead] Disk read error: %v", err) + return nil, startOffset, highWaterMark, false, err + } + + glog.V(4).Infof("[StatelessRead] Read %d messages from disk, nextOffset=%d, isDone=%v", + len(messages), nextOffset, isDone) + + // If we read from disk and got no messages, and isDone is true, we're at the end + endOfPartition = isDone && len(messages) == 0 + + return messages, nextOffset, highWaterMark, endOfPartition, nil +} + +// GetHighWaterMark returns the highest offset available in this partition +// This is a lightweight operation for clients to check partition state +func (logBuffer *LogBuffer) GetHighWaterMark() int64 { + logBuffer.RLock() + defer logBuffer.RUnlock() + return logBuffer.offset +} + +// GetLogStartOffset returns the earliest offset available (either in memory or on disk) +// This is useful for clients to know the valid offset range +func (logBuffer *LogBuffer) GetLogStartOffset() int64 { + logBuffer.RLock() + defer logBuffer.RUnlock() + + // Check if we have offset information + if !logBuffer.hasOffsets { + return 0 + } + + // Return the current buffer start offset - this is the earliest offset in memory RIGHT NOW + // For stateless fetch, we only return what's currently available in memory + // We don't check prevBuffers because they may be stale or getting flushed + return logBuffer.bufferStartOffset +} + +// WaitForDataWithTimeout waits up to maxWaitMs for data to be available at startOffset +// Returns true if data became available, false if timeout +// This allows "long poll" behavior for real-time consumers +func (logBuffer *LogBuffer) WaitForDataWithTimeout(startOffset int64, maxWaitMs int) bool { + if maxWaitMs <= 0 { + return false + } + + timeout := time.NewTimer(time.Duration(maxWaitMs) * time.Millisecond) + defer timeout.Stop() + + // Register for notifications + notifyChan := logBuffer.RegisterSubscriber(fmt.Sprintf("fetch-%d", startOffset)) + defer logBuffer.UnregisterSubscriber(fmt.Sprintf("fetch-%d", startOffset)) + + // Check if data is already available + logBuffer.RLock() + currentEnd := logBuffer.offset + logBuffer.RUnlock() + + if currentEnd >= startOffset { + return true + } + + // Wait for notification or timeout + select { + case <-notifyChan: + // Data might be available now + logBuffer.RLock() + currentEnd := logBuffer.offset + logBuffer.RUnlock() + return currentEnd >= startOffset + case <-timeout.C: + return false + } +} diff --git a/weed/util/log_buffer/log_read_stateless_test.go b/weed/util/log_buffer/log_read_stateless_test.go new file mode 100644 index 000000000..948a929ba --- /dev/null +++ b/weed/util/log_buffer/log_read_stateless_test.go @@ -0,0 +1,372 @@ +package log_buffer + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +func TestReadMessagesAtOffset_EmptyBuffer(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + lb.bufferStartOffset = 0 + lb.offset = 0 // Empty buffer + + messages, nextOffset, hwm, endOfPartition, err := lb.ReadMessagesAtOffset(100, 10, 1024) + + // Reading from future offset (100) when buffer is at 0 + // Should return empty, no error + if err != nil { + t.Errorf("Expected no error for future offset, got %v", err) + } + if len(messages) != 0 { + t.Errorf("Expected 0 messages, got %d", len(messages)) + } + if nextOffset != 100 { + t.Errorf("Expected nextOffset=100, got %d", nextOffset) + } + if !endOfPartition { + t.Error("Expected endOfPartition=true for future offset") + } + if hwm != 0 { + t.Errorf("Expected highWaterMark=0, got %d", hwm) + } +} + +func TestReadMessagesAtOffset_SingleMessage(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Add a message + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key1"), + Data: []byte("value1"), + Offset: 0, + } + lb.AddLogEntryToBuffer(entry) + + // Read from offset 0 + messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(0, 10, 1024) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if len(messages) != 1 { + t.Errorf("Expected 1 message, got %d", len(messages)) + } + if nextOffset != 1 { + t.Errorf("Expected nextOffset=1, got %d", nextOffset) + } + if !endOfPartition { + t.Error("Expected endOfPartition=true after reading all messages") + } + if messages[0].Offset != 0 { + t.Errorf("Expected message offset=0, got %d", messages[0].Offset) + } + if string(messages[0].Key) != "key1" { + t.Errorf("Expected key='key1', got '%s'", string(messages[0].Key)) + } +} + +func TestReadMessagesAtOffset_MultipleMessages(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Add 5 messages + for i := 0; i < 5; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Read from offset 0, max 3 messages + messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(0, 3, 10240) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if len(messages) != 3 { + t.Errorf("Expected 3 messages, got %d", len(messages)) + } + if nextOffset != 3 { + t.Errorf("Expected nextOffset=3, got %d", nextOffset) + } + + // Verify offsets are sequential + for i, msg := range messages { + if msg.Offset != int64(i) { + t.Errorf("Message %d: expected offset=%d, got %d", i, i, msg.Offset) + } + } +} + +func TestReadMessagesAtOffset_StartFromMiddle(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Add 10 messages (0-9) + for i := 0; i < 10; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Read from offset 5 + messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(5, 3, 10240) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if len(messages) != 3 { + t.Errorf("Expected 3 messages, got %d", len(messages)) + } + if nextOffset != 8 { + t.Errorf("Expected nextOffset=8, got %d", nextOffset) + } + + // Verify we got messages 5, 6, 7 + expectedOffsets := []int64{5, 6, 7} + for i, msg := range messages { + if msg.Offset != expectedOffsets[i] { + t.Errorf("Message %d: expected offset=%d, got %d", i, expectedOffsets[i], msg.Offset) + } + } +} + +func TestReadMessagesAtOffset_MaxBytesLimit(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Add messages with 100 bytes each + for i := 0; i < 10; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: make([]byte, 100), // 100 bytes + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Request with max 250 bytes (should get ~2 messages) + messages, _, _, _, err := lb.ReadMessagesAtOffset(0, 100, 250) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // Should get at least 1 message, but likely 2 + if len(messages) == 0 { + t.Error("Expected at least 1 message") + } + if len(messages) > 3 { + t.Errorf("Expected max 3 messages with 250 byte limit, got %d", len(messages)) + } +} + +func TestReadMessagesAtOffset_ConcurrentReads(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Add 100 messages + for i := 0; i < 100; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Start 10 concurrent readers at different offsets + done := make(chan bool, 10) + + for reader := 0; reader < 10; reader++ { + startOffset := int64(reader * 10) + go func(offset int64) { + messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(offset, 5, 10240) + + if err != nil { + t.Errorf("Reader at offset %d: unexpected error: %v", offset, err) + } + if len(messages) != 5 { + t.Errorf("Reader at offset %d: expected 5 messages, got %d", offset, len(messages)) + } + if nextOffset != offset+5 { + t.Errorf("Reader at offset %d: expected nextOffset=%d, got %d", offset, offset+5, nextOffset) + } + + // Verify sequential offsets + for i, msg := range messages { + expectedOffset := offset + int64(i) + if msg.Offset != expectedOffset { + t.Errorf("Reader at offset %d: message %d has offset %d, expected %d", + offset, i, msg.Offset, expectedOffset) + } + } + + done <- true + }(startOffset) + } + + // Wait for all readers + for i := 0; i < 10; i++ { + <-done + } +} + +func TestReadMessagesAtOffset_FutureOffset(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Add 5 messages (0-4) + for i := 0; i < 5; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Try to read from offset 10 (future) + messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(10, 10, 10240) + + if err != nil { + t.Errorf("Expected no error for future offset, got %v", err) + } + if len(messages) != 0 { + t.Errorf("Expected 0 messages for future offset, got %d", len(messages)) + } + if nextOffset != 10 { + t.Errorf("Expected nextOffset=10, got %d", nextOffset) + } + if !endOfPartition { + t.Error("Expected endOfPartition=true for future offset") + } +} + +func TestWaitForDataWithTimeout_DataAvailable(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Add message at offset 0 + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: 0, + } + lb.AddLogEntryToBuffer(entry) + + // Wait for data at offset 0 (should return immediately) + dataAvailable := lb.WaitForDataWithTimeout(0, 100) + + if !dataAvailable { + t.Error("Expected data to be available at offset 0") + } +} + +func TestWaitForDataWithTimeout_NoData(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + lb.bufferStartOffset = 0 + lb.offset = 0 + + // Don't add any messages, wait for offset 10 + + // Wait for data at offset 10 with short timeout + start := time.Now() + dataAvailable := lb.WaitForDataWithTimeout(10, 50) + elapsed := time.Since(start) + + if dataAvailable { + t.Error("Expected no data to be available") + } + // Note: Actual wait time may be shorter if subscriber mechanism + // returns immediately. Just verify no data was returned. + t.Logf("Waited %v for timeout", elapsed) +} + +func TestWaitForDataWithTimeout_DataArrives(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Start waiting in background + done := make(chan bool) + var dataAvailable bool + + go func() { + dataAvailable = lb.WaitForDataWithTimeout(0, 500) + done <- true + }() + + // Add data after 50ms + time.Sleep(50 * time.Millisecond) + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: 0, + } + lb.AddLogEntryToBuffer(entry) + + // Wait for result + <-done + + if !dataAvailable { + t.Error("Expected data to become available after being added") + } +} + +func TestGetHighWaterMark(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + // Initially should be 0 + hwm := lb.GetHighWaterMark() + if hwm != 0 { + t.Errorf("Expected initial HWM=0, got %d", hwm) + } + + // Add messages (offsets 0-4) + for i := 0; i < 5; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // HWM should be 5 (next offset to write, not last written offset) + // This matches Kafka semantics where HWM = last offset + 1 + hwm = lb.GetHighWaterMark() + if hwm != 5 { + t.Errorf("Expected HWM=5 after adding 5 messages (0-4), got %d", hwm) + } +} + +func TestGetLogStartOffset(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + lb.bufferStartOffset = 10 + + lso := lb.GetLogStartOffset() + if lso != 10 { + t.Errorf("Expected LSO=10, got %d", lso) + } +} |
