diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer_test.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 485 |
1 files changed, 483 insertions, 2 deletions
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index a4947a611..7b851de06 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -3,18 +3,19 @@ package log_buffer import ( "crypto/rand" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" "sync" "testing" "time" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) func TestNewLogBufferFirstBuffer(t *testing.T) { flushInterval := time.Second - lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) { + lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) }, nil, func() { }) @@ -63,3 +64,483 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount) } } + +// TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError tests that requesting an old offset +// that has been flushed to disk properly returns ResumeFromDiskError instead of hanging forever. +// This reproduces the bug where Schema Registry couldn't read the _schemas topic. +func TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError(t *testing.T) { + tests := []struct { + name string + bufferStartOffset int64 + currentOffset int64 + requestedOffset int64 + hasData bool + expectError error + description string + }{ + { + name: "Request offset 0 when buffer starts at 4 (Schema Registry bug scenario)", + bufferStartOffset: 4, + currentOffset: 10, + requestedOffset: 0, + hasData: true, + expectError: ResumeFromDiskError, + description: "When Schema Registry tries to read from offset 0, but data has been flushed to disk", + }, + { + name: "Request offset before buffer start with empty buffer", + bufferStartOffset: 10, + currentOffset: 10, + requestedOffset: 5, + hasData: false, + expectError: ResumeFromDiskError, + description: "Old offset with no data in memory should trigger disk read", + }, + { + name: "Request offset before buffer start with data", + bufferStartOffset: 100, + currentOffset: 150, + requestedOffset: 50, + hasData: true, + expectError: ResumeFromDiskError, + description: "Old offset with current data in memory should still trigger disk read", + }, + { + name: "Request current offset (no disk read needed)", + bufferStartOffset: 4, + currentOffset: 10, + requestedOffset: 10, + hasData: true, + expectError: nil, + description: "Current offset should return data from memory without error", + }, + { + name: "Request offset within buffer range", + bufferStartOffset: 4, + currentOffset: 10, + requestedOffset: 7, + hasData: true, + expectError: nil, + description: "Offset within buffer range should return data without error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a LogBuffer with minimal configuration + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + + // Simulate data that has been flushed to disk by setting bufferStartOffset + lb.bufferStartOffset = tt.bufferStartOffset + lb.offset = tt.currentOffset + + // CRITICAL: Mark this as an offset-based buffer + lb.hasOffsets = true + + // Add some data to the buffer if needed (at current offset position) + if tt.hasData { + testData := []byte("test message") + // Use AddLogEntryToBuffer to preserve offset information + lb.AddLogEntryToBuffer(&filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: testData, + Offset: tt.currentOffset, // Add data at current offset + }) + } + + // Create an offset-based position for the requested offset + requestPosition := NewMessagePositionFromOffset(tt.requestedOffset) + + // Try to read from the buffer + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + // Verify the error matches expectations + if tt.expectError != nil { + if err != tt.expectError { + t.Errorf("%s\nExpected error: %v\nGot error: %v\nbuf=%v, batchIdx=%d", + tt.description, tt.expectError, err, buf != nil, batchIdx) + } else { + t.Logf("✓ %s: correctly returned %v", tt.description, err) + } + } else { + if err != nil { + t.Errorf("%s\nExpected no error but got: %v\nbuf=%v, batchIdx=%d", + tt.description, err, buf != nil, batchIdx) + } else { + t.Logf("✓ %s: correctly returned data without error", tt.description) + } + } + }) + } +} + +// TestReadFromBuffer_OldOffsetWithNoPrevBuffers specifically tests the bug fix +// where requesting an old offset would return nil instead of ResumeFromDiskError +func TestReadFromBuffer_OldOffsetWithNoPrevBuffers(t *testing.T) { + // This is the exact scenario that caused the Schema Registry to hang: + // 1. Data was published to _schemas topic (offsets 0, 1, 2, 3) + // 2. Data was flushed to disk + // 3. LogBuffer's bufferStartOffset was updated to 4 + // 4. Schema Registry tried to read from offset 0 + // 5. ReadFromBuffer would return (nil, offset, nil) instead of ResumeFromDiskError + // 6. The subscriber would wait forever for data that would never come from memory + + lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {}) + + // Simulate the state after data has been flushed to disk: + // - bufferStartOffset = 10 (data 0-9 has been flushed) + // - offset = 15 (next offset to assign, current buffer has 10-14) + // - pos = 100 (some data in current buffer) + // Set prevBuffers to have non-overlapping ranges to avoid the safety check at line 420-428 + lb.bufferStartOffset = 10 + lb.offset = 15 + lb.pos = 100 + + // Modify prevBuffers to have non-zero offset ranges that DON'T include the requested offset + // This bypasses the safety check and exposes the real bug + for i := range lb.prevBuffers.buffers { + lb.prevBuffers.buffers[i].startOffset = 20 + int64(i)*10 // 20, 30, 40, etc. + lb.prevBuffers.buffers[i].offset = 25 + int64(i)*10 // 25, 35, 45, etc. + lb.prevBuffers.buffers[i].size = 0 // Empty (flushed) + } + + // Schema Registry requests offset 5 (which is before bufferStartOffset=10) + requestPosition := NewMessagePositionFromOffset(5) + + // Before the fix, this would return (nil, offset, nil) causing an infinite wait + // After the fix, this should return ResumeFromDiskError + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err) + t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d", + lb.bufferStartOffset, lb.offset, lb.pos) + t.Logf("DEBUG: Requested offset 5, prevBuffers[0] range: [%d-%d]", + lb.prevBuffers.buffers[0].startOffset, lb.prevBuffers.buffers[0].offset) + + if err != ResumeFromDiskError { + t.Errorf("CRITICAL BUG REPRODUCED: Expected ResumeFromDiskError but got err=%v, buf=%v, batchIdx=%d\n"+ + "This causes Schema Registry to hang indefinitely waiting for data that's on disk!", + err, buf != nil, batchIdx) + t.Errorf("The buggy code falls through without returning ResumeFromDiskError!") + } else { + t.Logf("✓ BUG FIX VERIFIED: Correctly returns ResumeFromDiskError when requesting old offset 5") + t.Logf(" This allows the subscriber to read from disk instead of waiting forever") + } +} + +// TestReadFromBuffer_EmptyBufferAtCurrentOffset tests Bug #2 +// where an empty buffer at the current offset would return empty data instead of ResumeFromDiskError +func TestReadFromBuffer_EmptyBufferAtCurrentOffset(t *testing.T) { + lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {}) + + // Simulate buffer state where data 0-3 was published and flushed, but buffer NOT advanced yet: + // - bufferStartOffset = 0 (buffer hasn't been advanced after flush) + // - offset = 4 (next offset to assign - data 0-3 exists) + // - pos = 0 (buffer is empty after flush) + // This happens in the window between flush and buffer advancement + lb.bufferStartOffset = 0 + lb.offset = 4 + lb.pos = 0 + + // Schema Registry requests offset 0 (which appears to be in range [0, 4]) + requestPosition := NewMessagePositionFromOffset(0) + + // BUG: Without fix, this returns empty buffer instead of checking disk + // FIX: Should return ResumeFromDiskError because buffer is empty (pos=0) despite valid range + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err) + t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d", + lb.bufferStartOffset, lb.offset, lb.pos) + + if err != ResumeFromDiskError { + if buf == nil || len(buf.Bytes()) == 0 { + t.Errorf("CRITICAL BUG #2 REPRODUCED: Empty buffer should return ResumeFromDiskError, got err=%v, buf=%v\n"+ + "Without the fix, Schema Registry gets empty data instead of reading from disk!", + err, buf != nil) + } + } else { + t.Logf("✓ BUG #2 FIX VERIFIED: Empty buffer correctly returns ResumeFromDiskError to check disk") + } +} + +// TestReadFromBuffer_OffsetRanges tests various offset range scenarios +func TestReadFromBuffer_OffsetRanges(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + + // Setup: buffer contains offsets 10-20 + lb.bufferStartOffset = 10 + lb.offset = 20 + lb.pos = 100 // some data in buffer + + testCases := []struct { + name string + requestedOffset int64 + expectedError error + description string + }{ + { + name: "Before buffer start", + requestedOffset: 5, + expectedError: ResumeFromDiskError, + description: "Offset 5 < bufferStartOffset 10 → read from disk", + }, + { + name: "At buffer start", + requestedOffset: 10, + expectedError: nil, + description: "Offset 10 == bufferStartOffset 10 → read from buffer", + }, + { + name: "Within buffer range", + requestedOffset: 15, + expectedError: nil, + description: "Offset 15 is within [10, 20] → read from buffer", + }, + { + name: "At buffer end", + requestedOffset: 20, + expectedError: nil, + description: "Offset 20 == offset 20 → read from buffer", + }, + { + name: "After buffer end", + requestedOffset: 25, + expectedError: nil, + description: "Offset 25 > offset 20 → future data, return nil without error", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + requestPosition := NewMessagePositionFromOffset(tc.requestedOffset) + _, _, err := lb.ReadFromBuffer(requestPosition) + + if tc.expectedError != nil { + if err != tc.expectedError { + t.Errorf("%s\nExpected error: %v, got: %v", tc.description, tc.expectedError, err) + } else { + t.Logf("✓ %s", tc.description) + } + } else { + // For nil expectedError, we accept either nil or no error condition + // (future offsets return nil without error) + if err != nil && err != ResumeFromDiskError { + t.Errorf("%s\nExpected no ResumeFromDiskError, got: %v", tc.description, err) + } else { + t.Logf("✓ %s", tc.description) + } + } + }) + } +} + +// TestReadFromBuffer_InitializedFromDisk tests Bug #3 +// where bufferStartOffset was incorrectly set to 0 after InitializeOffsetFromExistingData, +// causing reads for old offsets to return new data instead of triggering a disk read. +func TestReadFromBuffer_InitializedFromDisk(t *testing.T) { + // This reproduces the real Schema Registry bug scenario: + // 1. Broker restarts, finds 4 messages on disk (offsets 0-3) + // 2. InitializeOffsetFromExistingData sets offset=4 + // - BUG: bufferStartOffset=0 (wrong!) + // - FIX: bufferStartOffset=4 (correct!) + // 3. First new message is written (offset 4) + // 4. Schema Registry reads offset 0 + // 5. With FIX: requestedOffset=0 < bufferStartOffset=4 → ResumeFromDiskError (correct!) + // 6. Without FIX: requestedOffset=0 in range [0, 5] → returns wrong data (bug!) + + lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {}) + + // Use the actual InitializeOffsetFromExistingData to test the fix + err := lb.InitializeOffsetFromExistingData(func() (int64, error) { + return 3, nil // Simulate 4 messages on disk (offsets 0-3, highest=3) + }) + if err != nil { + t.Fatalf("InitializeOffsetFromExistingData failed: %v", err) + } + + t.Logf("After InitializeOffsetFromExistingData(highestOffset=3):") + t.Logf(" offset=%d (should be 4), bufferStartOffset=%d (FIX: should be 4, not 0)", + lb.offset, lb.bufferStartOffset) + + // Now write a new message at offset 4 + lb.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte("new-key"), + Value: []byte("new-message-at-offset-4"), + TsNs: time.Now().UnixNano(), + }) + // After AddToBuffer: offset=5, pos>0 + + // Schema Registry tries to read offset 0 (should be on disk) + requestPosition := NewMessagePositionFromOffset(0) + + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + t.Logf("After writing new message:") + t.Logf(" bufferStartOffset=%d, offset=%d, pos=%d", lb.bufferStartOffset, lb.offset, lb.pos) + t.Logf(" Requested offset 0, got: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err) + + // EXPECTED BEHAVIOR (with fix): + // bufferStartOffset=4 after initialization, so requestedOffset=0 < bufferStartOffset=4 + // → returns ResumeFromDiskError + + // BUGGY BEHAVIOR (without fix): + // bufferStartOffset=0 after initialization, so requestedOffset=0 is in range [0, 5] + // → returns the NEW message (offset 4) instead of reading from disk! + + if err != ResumeFromDiskError { + t.Errorf("CRITICAL BUG #3 REPRODUCED: Reading offset 0 after initialization from disk should return ResumeFromDiskError\n"+ + "Instead got: err=%v, buf=%v, batchIdx=%d\n"+ + "This means Schema Registry would receive WRONG data (offset 4) when requesting offset 0!", + err, buf != nil, batchIdx) + t.Errorf("Root cause: bufferStartOffset=%d should be 4 after InitializeOffsetFromExistingData(highestOffset=3)", + lb.bufferStartOffset) + } else { + t.Logf("✓ BUG #3 FIX VERIFIED: Reading old offset 0 correctly returns ResumeFromDiskError") + t.Logf(" This ensures Schema Registry reads correct data from disk instead of getting new messages") + } +} + +// TestLoopProcessLogDataWithOffset_DiskReadRetry tests that when a subscriber +// reads from disk before flush completes, it continues to retry disk reads +// and eventually finds the data after flush completes. +// This reproduces the Schema Registry timeout issue on first start. +func TestLoopProcessLogDataWithOffset_DiskReadRetry(t *testing.T) { + diskReadCallCount := 0 + diskReadMu := sync.Mutex{} + dataFlushedToDisk := false + var flushedData []*filer_pb.LogEntry + + // Create a readFromDiskFn that simulates the race condition + readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + diskReadMu.Lock() + diskReadCallCount++ + callNum := diskReadCallCount + hasData := dataFlushedToDisk + diskReadMu.Unlock() + + t.Logf("DISK READ #%d: startOffset=%d, dataFlushedToDisk=%v", callNum, startPosition.Offset, hasData) + + if !hasData { + // Simulate: data not yet on disk (flush hasn't completed) + t.Logf(" → No data found (flush not completed yet)") + return startPosition, false, nil + } + + // Data is now on disk, process it + t.Logf(" → Found %d entries on disk", len(flushedData)) + for _, entry := range flushedData { + if entry.Offset >= startPosition.Offset { + isDone, err := eachLogEntryFn(entry) + if err != nil || isDone { + return NewMessagePositionFromOffset(entry.Offset + 1), isDone, err + } + } + } + return NewMessagePositionFromOffset(int64(len(flushedData))), false, nil + } + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf)) + // Simulate writing to disk + diskReadMu.Lock() + dataFlushedToDisk = true + // Parse the buffer and add entries to flushedData + // For this test, we'll just create mock entries + flushedData = append(flushedData, &filer_pb.LogEntry{ + Key: []byte("key-0"), + Data: []byte("message-0"), + TsNs: time.Now().UnixNano(), + Offset: 0, + }) + diskReadMu.Unlock() + } + + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, readFromDiskFn, nil) + defer logBuffer.ShutdownLogBuffer() + + // Simulate the race condition: + // 1. Subscriber starts reading from offset 0 + // 2. Data is not yet flushed + // 3. Loop calls readFromDiskFn → no data found + // 4. A bit later, data gets flushed + // 5. Loop should continue and call readFromDiskFn again + + receivedMessages := 0 + mu := sync.Mutex{} + maxIterations := 50 // Allow up to 50 iterations (500ms with 10ms sleep each) + iterationCount := 0 + + waitForDataFn := func() bool { + mu.Lock() + defer mu.Unlock() + iterationCount++ + // Stop after receiving message or max iterations + return receivedMessages == 0 && iterationCount < maxIterations + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + mu.Lock() + receivedMessages++ + mu.Unlock() + t.Logf("✉️ RECEIVED: offset=%d key=%s", offset, string(logEntry.Key)) + return true, nil // Stop after first message + } + + // Start the reader in a goroutine + var readerWg sync.WaitGroup + readerWg.Add(1) + go func() { + defer readerWg.Done() + startPosition := NewMessagePositionFromOffset(0) + _, isDone, err := logBuffer.LoopProcessLogDataWithOffset("test-subscriber", startPosition, 0, waitForDataFn, eachLogEntryFn) + t.Logf("📋 Reader finished: isDone=%v, err=%v", isDone, err) + }() + + // Wait a bit to let the first disk read happen (returns no data) + time.Sleep(50 * time.Millisecond) + + // Now add data and flush it + t.Logf("➕ Adding message to buffer...") + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte("key-0"), + Value: []byte("message-0"), + TsNs: time.Now().UnixNano(), + }) + + // Force flush + t.Logf("Force flushing...") + logBuffer.ForceFlush() + + // Wait for reader to finish + readerWg.Wait() + + // Check results + diskReadMu.Lock() + finalDiskReadCount := diskReadCallCount + diskReadMu.Unlock() + + mu.Lock() + finalReceivedMessages := receivedMessages + finalIterations := iterationCount + mu.Unlock() + + t.Logf("\nRESULTS:") + t.Logf(" Disk reads: %d", finalDiskReadCount) + t.Logf(" Received messages: %d", finalReceivedMessages) + t.Logf(" Loop iterations: %d", finalIterations) + + if finalDiskReadCount < 2 { + t.Errorf("CRITICAL BUG REPRODUCED: Disk read was only called %d time(s)", finalDiskReadCount) + t.Errorf("Expected: Multiple disk reads as the loop continues after flush completes") + t.Errorf("This is why Schema Registry times out - it reads once before flush, never re-reads after flush") + } + + if finalReceivedMessages == 0 { + t.Errorf("SCHEMA REGISTRY TIMEOUT REPRODUCED: No messages received even after flush") + t.Errorf("The subscriber is stuck because disk reads are not retried") + } else { + t.Logf("✓ SUCCESS: Message received after %d disk read attempts", finalDiskReadCount) + } +} |
