diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer_flush_gap_test.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer_flush_gap_test.go | 680 |
1 files changed, 680 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go new file mode 100644 index 000000000..63d344b1a --- /dev/null +++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go @@ -0,0 +1,680 @@ +package log_buffer + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/protobuf/proto" +) + +// TestFlushOffsetGap_ReproduceDataLoss reproduces the critical bug where messages +// are lost in the gap between flushed disk data and in-memory buffer. +// +// OBSERVED BEHAVIOR FROM LOGS: +// Request offset: 1764 +// Disk contains: 1000-1763 (764 messages) +// Memory buffer starts at: 1800 +// Gap: 1764-1799 (36 messages) ← MISSING! +// +// This test verifies: +// 1. All messages sent to buffer are accounted for +// 2. No gaps exist between disk and memory offsets +// 3. Flushed data and in-memory data have continuous offset ranges +func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { + var flushedMessages []*filer_pb.LogEntry + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf)) + + // Parse and store flushed messages + flushMu.Lock() + defer flushMu.Unlock() + + // Parse buffer to extract messages + parsedCount := 0 + for pos := 0; pos+4 < len(buf); { + if pos+4 > len(buf) { + break + } + + size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err == nil { + flushedMessages = append(flushedMessages, logEntry) + parsedCount++ + } + + pos += 4 + int(size) + } + + t.Logf(" Parsed %d messages from flush buffer", parsedCount) + } + + logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Send 100 messages + messageCount := 100 + t.Logf("Sending %d messages...", messageCount) + + for i := 0; i < messageCount; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + }) + } + + // Force flush multiple times to simulate real workload + t.Logf("Forcing flush...") + logBuffer.ForceFlush() + + // Add more messages after flush + for i := messageCount; i < messageCount+50; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + }) + } + + // Force another flush + logBuffer.ForceFlush() + time.Sleep(200 * time.Millisecond) // Wait for flush to complete + + // Now check the buffer state + logBuffer.RLock() + bufferStartOffset := logBuffer.bufferStartOffset + currentOffset := logBuffer.offset + pos := logBuffer.pos + logBuffer.RUnlock() + + flushMu.Lock() + flushedCount := len(flushedMessages) + var maxFlushedOffset int64 = -1 + var minFlushedOffset int64 = -1 + if flushedCount > 0 { + minFlushedOffset = flushedMessages[0].Offset + maxFlushedOffset = flushedMessages[flushedCount-1].Offset + } + flushMu.Unlock() + + t.Logf("\nBUFFER STATE AFTER FLUSH:") + t.Logf(" bufferStartOffset: %d", bufferStartOffset) + t.Logf(" currentOffset (HWM): %d", currentOffset) + t.Logf(" pos (bytes in buffer): %d", pos) + t.Logf(" Messages sent: %d (offsets 0-%d)", messageCount+50, messageCount+49) + t.Logf(" Messages flushed to disk: %d (offsets %d-%d)", flushedCount, minFlushedOffset, maxFlushedOffset) + + // CRITICAL CHECK: Is there a gap between flushed data and memory buffer? + if flushedCount > 0 && maxFlushedOffset >= 0 { + gap := bufferStartOffset - (maxFlushedOffset + 1) + + t.Logf("\nOFFSET CONTINUITY CHECK:") + t.Logf(" Last flushed offset: %d", maxFlushedOffset) + t.Logf(" Buffer starts at: %d", bufferStartOffset) + t.Logf(" Gap: %d offsets", gap) + + if gap > 0 { + t.Errorf("❌ CRITICAL BUG REPRODUCED: OFFSET GAP DETECTED!") + t.Errorf(" Disk has offsets %d-%d", minFlushedOffset, maxFlushedOffset) + t.Errorf(" Memory buffer starts at: %d", bufferStartOffset) + t.Errorf(" MISSING OFFSETS: %d-%d (%d messages)", maxFlushedOffset+1, bufferStartOffset-1, gap) + t.Errorf(" These messages are LOST - neither on disk nor in memory!") + } else if gap < 0 { + t.Errorf("❌ OFFSET OVERLAP: Memory buffer starts BEFORE last flushed offset!") + t.Errorf(" This indicates data corruption or race condition") + } else { + t.Logf("✅ PASS: No gap detected - offsets are continuous") + } + + // Check if we can read all expected offsets + t.Logf("\nREADABILITY CHECK:") + for testOffset := int64(0); testOffset < currentOffset; testOffset += 10 { + // Try to read from buffer + requestPosition := NewMessagePositionFromOffset(testOffset) + buf, _, err := logBuffer.ReadFromBuffer(requestPosition) + + isReadable := (buf != nil && len(buf.Bytes()) > 0) || err == ResumeFromDiskError + status := "✅" + if !isReadable && err == nil { + status = "❌ NOT READABLE" + } + + t.Logf(" Offset %d: %s (buf=%v, err=%v)", testOffset, status, buf != nil, err) + + // If offset is in the gap, it should fail to read + if flushedCount > 0 && testOffset > maxFlushedOffset && testOffset < bufferStartOffset { + if isReadable { + t.Errorf(" Unexpected: Offset %d in gap range should NOT be readable!", testOffset) + } else { + t.Logf(" Expected: Offset %d in gap is not readable (data lost)", testOffset) + } + } + } + } + + // Check that all sent messages are accounted for + expectedMessageCount := messageCount + 50 + messagesInMemory := int(currentOffset - bufferStartOffset) + totalAccountedFor := flushedCount + messagesInMemory + + t.Logf("\nMESSAGE ACCOUNTING:") + t.Logf(" Expected: %d messages", expectedMessageCount) + t.Logf(" Flushed to disk: %d", flushedCount) + t.Logf(" In memory buffer: %d (offset range %d-%d)", messagesInMemory, bufferStartOffset, currentOffset-1) + t.Logf(" Total accounted for: %d", totalAccountedFor) + t.Logf(" Missing: %d messages", expectedMessageCount-totalAccountedFor) + + if totalAccountedFor < expectedMessageCount { + t.Errorf("❌ DATA LOSS CONFIRMED: %d messages are missing!", expectedMessageCount-totalAccountedFor) + } else { + t.Logf("✅ All messages accounted for") + } +} + +// TestFlushOffsetGap_CheckPrevBuffers tests if messages might be stuck in prevBuffers +// instead of being properly flushed to disk. +func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { + var flushCount int + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + flushMu.Lock() + flushCount++ + count := flushCount + flushMu.Unlock() + + t.Logf("FLUSH #%d: minOffset=%d maxOffset=%d size=%d bytes", count, minOffset, maxOffset, len(buf)) + } + + logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Send messages in batches with flushes in between + for batch := 0; batch < 5; batch++ { + t.Logf("\nBatch %d:", batch) + + // Send 20 messages + for i := 0; i < 20; i++ { + offset := int64(batch*20 + i) + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", offset)), + Value: []byte(fmt.Sprintf("message-%d", offset)), + TsNs: time.Now().UnixNano(), + }) + } + + // Check state before flush + logBuffer.RLock() + beforeFlushOffset := logBuffer.offset + beforeFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + // Force flush + logBuffer.ForceFlush() + time.Sleep(50 * time.Millisecond) + + // Check state after flush + logBuffer.RLock() + afterFlushOffset := logBuffer.offset + afterFlushStart := logBuffer.bufferStartOffset + prevBufferCount := len(logBuffer.prevBuffers.buffers) + + // Check prevBuffers state + t.Logf(" Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) + t.Logf(" After flush: offset=%d, bufferStartOffset=%d, prevBuffers=%d", + afterFlushOffset, afterFlushStart, prevBufferCount) + + // Check each prevBuffer + for i, prevBuf := range logBuffer.prevBuffers.buffers { + if prevBuf.size > 0 { + t.Logf(" prevBuffer[%d]: offsets %d-%d, size=%d bytes (NOT FLUSHED!)", + i, prevBuf.startOffset, prevBuf.offset, prevBuf.size) + } + } + logBuffer.RUnlock() + + // CRITICAL: Check if bufferStartOffset advanced correctly + expectedNewStart := beforeFlushOffset + if afterFlushStart != expectedNewStart { + t.Errorf(" ❌ bufferStartOffset mismatch!") + t.Errorf(" Expected: %d (= offset before flush)", expectedNewStart) + t.Errorf(" Actual: %d", afterFlushStart) + t.Errorf(" Gap: %d offsets", expectedNewStart-afterFlushStart) + } + } +} + +// TestFlushOffsetGap_ConcurrentWriteAndFlush tests for race conditions +// between writing new messages and flushing old ones. +func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { + var allFlushedOffsets []int64 + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) + + flushMu.Lock() + // Record the offset range that was flushed + for offset := minOffset; offset <= maxOffset; offset++ { + allFlushedOffsets = append(allFlushedOffsets, offset) + } + flushMu.Unlock() + } + + logBuffer := NewLogBuffer("test", 50*time.Millisecond, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Concurrently write messages and force flushes + var wg sync.WaitGroup + + // Writer goroutine + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 200; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + }) + if i%50 == 0 { + time.Sleep(10 * time.Millisecond) + } + } + }() + + // Flusher goroutine + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 5; i++ { + time.Sleep(30 * time.Millisecond) + logBuffer.ForceFlush() + } + }() + + wg.Wait() + time.Sleep(200 * time.Millisecond) // Wait for final flush + + // Check final state + logBuffer.RLock() + finalOffset := logBuffer.offset + finalBufferStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + flushMu.Lock() + flushedCount := len(allFlushedOffsets) + flushMu.Unlock() + + expectedCount := int(finalOffset) + inMemory := int(finalOffset - finalBufferStart) + totalAccountedFor := flushedCount + inMemory + + t.Logf("\nFINAL STATE:") + t.Logf(" Total messages sent: %d (offsets 0-%d)", expectedCount, expectedCount-1) + t.Logf(" Flushed to disk: %d", flushedCount) + t.Logf(" In memory: %d (offsets %d-%d)", inMemory, finalBufferStart, finalOffset-1) + t.Logf(" Total accounted: %d", totalAccountedFor) + t.Logf(" Missing: %d", expectedCount-totalAccountedFor) + + if totalAccountedFor < expectedCount { + t.Errorf("❌ DATA LOSS in concurrent scenario: %d messages missing!", expectedCount-totalAccountedFor) + } +} + +// TestFlushOffsetGap_ProductionScenario reproduces the actual production scenario +// where the broker uses AddLogEntryToBuffer with explicit Kafka offsets. +// This simulates leader publishing with offset assignment. +func TestFlushOffsetGap_ProductionScenario(t *testing.T) { + var flushedData []struct { + minOffset int64 + maxOffset int64 + messages []*filer_pb.LogEntry + } + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Parse messages from buffer + messages := []*filer_pb.LogEntry{} + for pos := 0; pos+4 < len(buf); { + size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) + if pos+4+int(size) > len(buf) { + break + } + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err == nil { + messages = append(messages, logEntry) + } + pos += 4 + int(size) + } + + flushMu.Lock() + flushedData = append(flushedData, struct { + minOffset int64 + maxOffset int64 + messages []*filer_pb.LogEntry + }{minOffset, maxOffset, messages}) + flushMu.Unlock() + + t.Logf("FLUSH: minOffset=%d maxOffset=%d, parsed %d messages", minOffset, maxOffset, len(messages)) + } + + logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Simulate broker behavior: assign Kafka offsets and add to buffer + // This is what PublishWithOffset() does + nextKafkaOffset := int64(0) + + // Round 1: Add 50 messages with Kafka offsets 0-49 + t.Logf("\n=== ROUND 1: Adding messages 0-49 ===") + for i := 0; i < 50; i++ { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Data: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + Offset: nextKafkaOffset, // Explicit Kafka offset + } + logBuffer.AddLogEntryToBuffer(logEntry) + nextKafkaOffset++ + } + + // Check buffer state before flush + logBuffer.RLock() + beforeFlushOffset := logBuffer.offset + beforeFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + t.Logf("Before flush: logBuffer.offset=%d, bufferStartOffset=%d, nextKafkaOffset=%d", + beforeFlushOffset, beforeFlushStart, nextKafkaOffset) + + // Flush + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Check buffer state after flush + logBuffer.RLock() + afterFlushOffset := logBuffer.offset + afterFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + t.Logf("After flush: logBuffer.offset=%d, bufferStartOffset=%d", + afterFlushOffset, afterFlushStart) + + // Round 2: Add another 50 messages with Kafka offsets 50-99 + t.Logf("\n=== ROUND 2: Adding messages 50-99 ===") + for i := 0; i < 50; i++ { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", 50+i)), + Data: []byte(fmt.Sprintf("message-%d", 50+i)), + TsNs: time.Now().UnixNano(), + Offset: nextKafkaOffset, + } + logBuffer.AddLogEntryToBuffer(logEntry) + nextKafkaOffset++ + } + + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Verification: Check if all Kafka offsets are accounted for + flushMu.Lock() + t.Logf("\n=== VERIFICATION ===") + t.Logf("Expected Kafka offsets: 0-%d", nextKafkaOffset-1) + + allOffsets := make(map[int64]bool) + for flushIdx, flush := range flushedData { + t.Logf("Flush #%d: minOffset=%d, maxOffset=%d, messages=%d", + flushIdx, flush.minOffset, flush.maxOffset, len(flush.messages)) + + for _, msg := range flush.messages { + if allOffsets[msg.Offset] { + t.Errorf(" ❌ DUPLICATE: Offset %d appears multiple times!", msg.Offset) + } + allOffsets[msg.Offset] = true + } + } + flushMu.Unlock() + + // Check for missing offsets + missingOffsets := []int64{} + for expectedOffset := int64(0); expectedOffset < nextKafkaOffset; expectedOffset++ { + if !allOffsets[expectedOffset] { + missingOffsets = append(missingOffsets, expectedOffset) + } + } + + if len(missingOffsets) > 0 { + t.Errorf("\n❌ MISSING OFFSETS DETECTED: %d offsets missing", len(missingOffsets)) + if len(missingOffsets) <= 20 { + t.Errorf("Missing: %v", missingOffsets) + } else { + t.Errorf("Missing: %v ... and %d more", missingOffsets[:20], len(missingOffsets)-20) + } + t.Errorf("\nThis reproduces the production bug!") + } else { + t.Logf("\n✅ SUCCESS: All %d Kafka offsets accounted for (0-%d)", nextKafkaOffset, nextKafkaOffset-1) + } + + // Check buffer offset consistency + logBuffer.RLock() + finalOffset := logBuffer.offset + finalBufferStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("\nFinal buffer state:") + t.Logf(" logBuffer.offset: %d", finalOffset) + t.Logf(" bufferStartOffset: %d", finalBufferStart) + t.Logf(" Expected (nextKafkaOffset): %d", nextKafkaOffset) + + if finalOffset != nextKafkaOffset { + t.Errorf("❌ logBuffer.offset mismatch: expected %d, got %d", nextKafkaOffset, finalOffset) + } +} + +// TestFlushOffsetGap_ConcurrentReadDuringFlush tests if concurrent reads +// during flush can cause messages to be missed. +func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { + var flushedOffsets []int64 + var flushMu sync.Mutex + + readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + // Simulate reading from disk - return flushed offsets + flushMu.Lock() + defer flushMu.Unlock() + + for _, offset := range flushedOffsets { + if offset >= startPosition.Offset { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", offset)), + Data: []byte(fmt.Sprintf("message-%d", offset)), + TsNs: time.Now().UnixNano(), + Offset: offset, + } + isDone, err := eachLogEntryFn(logEntry) + if err != nil || isDone { + return NewMessagePositionFromOffset(offset + 1), isDone, err + } + } + } + return startPosition, false, nil + } + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Parse and store flushed offsets + flushMu.Lock() + defer flushMu.Unlock() + + for pos := 0; pos+4 < len(buf); { + size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) + if pos+4+int(size) > len(buf) { + break + } + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err == nil { + flushedOffsets = append(flushedOffsets, logEntry.Offset) + } + pos += 4 + int(size) + } + + t.Logf("FLUSH: Stored %d offsets to disk (minOffset=%d, maxOffset=%d)", + len(flushedOffsets), minOffset, maxOffset) + } + + logBuffer := NewLogBuffer("test", time.Hour, flushFn, readFromDiskFn, nil) + defer logBuffer.ShutdownLogBuffer() + + // Add 100 messages + t.Logf("Adding 100 messages...") + for i := int64(0); i < 100; i++ { + logEntry := &filer_pb.LogEntry{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Data: []byte(fmt.Sprintf("message-%d", i)), + TsNs: time.Now().UnixNano(), + Offset: i, + } + logBuffer.AddLogEntryToBuffer(logEntry) + } + + // Flush (moves data to disk) + t.Logf("Flushing...") + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Now try to read all messages using ReadMessagesAtOffset + t.Logf("\nReading messages from offset 0...") + messages, nextOffset, hwm, endOfPartition, err := logBuffer.ReadMessagesAtOffset(0, 1000, 1024*1024) + + t.Logf("Read result: messages=%d, nextOffset=%d, hwm=%d, endOfPartition=%v, err=%v", + len(messages), nextOffset, hwm, endOfPartition, err) + + // Verify all offsets can be read + readOffsets := make(map[int64]bool) + for _, msg := range messages { + readOffsets[msg.Offset] = true + } + + missingOffsets := []int64{} + for expectedOffset := int64(0); expectedOffset < 100; expectedOffset++ { + if !readOffsets[expectedOffset] { + missingOffsets = append(missingOffsets, expectedOffset) + } + } + + if len(missingOffsets) > 0 { + t.Errorf("❌ MISSING OFFSETS after flush: %d offsets cannot be read", len(missingOffsets)) + if len(missingOffsets) <= 20 { + t.Errorf("Missing: %v", missingOffsets) + } else { + t.Errorf("Missing: %v ... and %d more", missingOffsets[:20], len(missingOffsets)-20) + } + } else { + t.Logf("✅ All 100 offsets can be read after flush") + } +} + +// TestFlushOffsetGap_ForceFlushAdvancesBuffer tests if ForceFlush +// properly advances bufferStartOffset after flushing. +func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { + flushedRanges := []struct{ min, max int64 }{} + var flushMu sync.Mutex + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + flushMu.Lock() + flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) + flushMu.Unlock() + t.Logf("FLUSH: offsets %d-%d", minOffset, maxOffset) + } + + logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) // Long interval, manual flush only + defer logBuffer.ShutdownLogBuffer() + + // Send messages, flush, check state - repeat + for round := 0; round < 3; round++ { + t.Logf("\n=== ROUND %d ===", round) + + // Check state before adding messages + logBuffer.RLock() + beforeOffset := logBuffer.offset + beforeStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("Before adding: offset=%d, bufferStartOffset=%d", beforeOffset, beforeStart) + + // Add 10 messages + for i := 0; i < 10; i++ { + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("round-%d-msg-%d", round, i)), + Value: []byte(fmt.Sprintf("data-%d-%d", round, i)), + TsNs: time.Now().UnixNano(), + }) + } + + // Check state after adding + logBuffer.RLock() + afterAddOffset := logBuffer.offset + afterAddStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("After adding: offset=%d, bufferStartOffset=%d", afterAddOffset, afterAddStart) + + // Force flush + t.Logf("Forcing flush...") + logBuffer.ForceFlush() + time.Sleep(100 * time.Millisecond) + + // Check state after flush + logBuffer.RLock() + afterFlushOffset := logBuffer.offset + afterFlushStart := logBuffer.bufferStartOffset + logBuffer.RUnlock() + + t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) + + // CRITICAL CHECK: bufferStartOffset should advance to where offset was before flush + if afterFlushStart != afterAddOffset { + t.Errorf("❌ FLUSH BUG: bufferStartOffset did NOT advance correctly!") + t.Errorf(" Expected bufferStartOffset=%d (= offset after add)", afterAddOffset) + t.Errorf(" Actual bufferStartOffset=%d", afterFlushStart) + t.Errorf(" Gap: %d offsets WILL BE LOST", afterAddOffset-afterFlushStart) + } else { + t.Logf("✅ bufferStartOffset correctly advanced to %d", afterFlushStart) + } + } + + // Final verification: check all offset ranges are continuous + flushMu.Lock() + t.Logf("\n=== FLUSHED RANGES ===") + for i, r := range flushedRanges { + t.Logf("Flush #%d: offsets %d-%d", i, r.min, r.max) + + // Check continuity with previous flush + if i > 0 { + prevMax := flushedRanges[i-1].max + currentMin := r.min + gap := currentMin - (prevMax + 1) + + if gap > 0 { + t.Errorf("❌ GAP between flush #%d and #%d: %d offsets missing!", i-1, i, gap) + } else if gap < 0 { + t.Errorf("❌ OVERLAP between flush #%d and #%d: %d offsets duplicated!", i-1, i, -gap) + } else { + t.Logf(" ✅ Continuous with previous flush") + } + } + } + flushMu.Unlock() +} + |
