diff options
Diffstat (limited to 'weed/mq/topic/local_partition_subscribe_test.go')
| -rw-r--r-- | weed/mq/topic/local_partition_subscribe_test.go | 566 |
1 files changed, 566 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition_subscribe_test.go b/weed/mq/topic/local_partition_subscribe_test.go new file mode 100644 index 000000000..3f49432e5 --- /dev/null +++ b/weed/mq/topic/local_partition_subscribe_test.go @@ -0,0 +1,566 @@ +package topic + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" +) + +// MockLogBuffer provides a controllable log buffer for testing +type MockLogBuffer struct { + // In-memory data + memoryEntries []*filer_pb.LogEntry + memoryStartTime time.Time + memoryStopTime time.Time + memoryStartOffset int64 + memoryStopOffset int64 + + // Disk data + diskEntries []*filer_pb.LogEntry + diskStartTime time.Time + diskStopTime time.Time + diskStartOffset int64 + diskStopOffset int64 + + // Behavior control + diskReadDelay time.Duration + memoryReadDelay time.Duration + diskReadError error + memoryReadError error +} + +// MockReadFromDiskFn simulates reading from disk +func (m *MockLogBuffer) MockReadFromDiskFn(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (log_buffer.MessagePosition, bool, error) { + if m.diskReadDelay > 0 { + time.Sleep(m.diskReadDelay) + } + + if m.diskReadError != nil { + return startPosition, false, m.diskReadError + } + + isOffsetBased := startPosition.IsOffsetBased + lastPosition := startPosition + isDone := false + + for _, entry := range m.diskEntries { + // Filter based on mode + if isOffsetBased { + if entry.Offset < startPosition.Offset { + continue + } + } else { + entryTime := time.Unix(0, entry.TsNs) + if entryTime.Before(startPosition.Time) { + continue + } + } + + // Apply stopTsNs filter + if stopTsNs > 0 && entry.TsNs > stopTsNs { + isDone = true + break + } + + // Call handler + done, err := eachLogEntryFn(entry) + if err != nil { + return lastPosition, false, err + } + if done { + isDone = true + break + } + + // Update position + if isOffsetBased { + lastPosition = log_buffer.NewMessagePosition(entry.TsNs, entry.Offset+1) + } else { + lastPosition = log_buffer.NewMessagePosition(entry.TsNs, entry.Offset) + } + } + + return lastPosition, isDone, nil +} + +// MockLoopProcessLogDataWithOffset simulates reading from memory with offset +func (m *MockLogBuffer) MockLoopProcessLogDataWithOffset(readerName string, startPosition log_buffer.MessagePosition, stopTsNs int64, waitForDataFn func() bool, eachLogDataFn log_buffer.EachLogEntryWithOffsetFuncType) (log_buffer.MessagePosition, bool, error) { + if m.memoryReadDelay > 0 { + time.Sleep(m.memoryReadDelay) + } + + if m.memoryReadError != nil { + return startPosition, false, m.memoryReadError + } + + lastPosition := startPosition + isDone := false + + // Check if requested offset is in memory + if startPosition.Offset < m.memoryStartOffset { + // Data is on disk + return startPosition, false, log_buffer.ResumeFromDiskError + } + + for _, entry := range m.memoryEntries { + // Filter by offset + if entry.Offset < startPosition.Offset { + continue + } + + // Apply stopTsNs filter + if stopTsNs > 0 && entry.TsNs > stopTsNs { + isDone = true + break + } + + // Call handler + done, err := eachLogDataFn(entry, entry.Offset) + if err != nil { + return lastPosition, false, err + } + if done { + isDone = true + break + } + + // Update position + lastPosition = log_buffer.NewMessagePosition(entry.TsNs, entry.Offset+1) + } + + return lastPosition, isDone, nil +} + +// Helper to create test entries +func createTestEntry(offset int64, timestamp time.Time, key, value string) *filer_pb.LogEntry { + return &filer_pb.LogEntry{ + TsNs: timestamp.UnixNano(), + Offset: offset, + Key: []byte(key), + Data: []byte(value), + } +} + +// TestOffsetBasedSubscribe_AllDataInMemory tests reading when all data is in memory +func TestOffsetBasedSubscribe_AllDataInMemory(t *testing.T) { + baseTime := time.Now() + + mock := &MockLogBuffer{ + memoryEntries: []*filer_pb.LogEntry{ + createTestEntry(0, baseTime, "key0", "value0"), + createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"), + createTestEntry(2, baseTime.Add(2*time.Second), "key2", "value2"), + createTestEntry(3, baseTime.Add(3*time.Second), "key3", "value3"), + }, + memoryStartOffset: 0, + memoryStopOffset: 3, + diskEntries: []*filer_pb.LogEntry{}, // No disk data + } + + // Test reading from offset 0 + t.Run("ReadFromOffset0", func(t *testing.T) { + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePositionFromOffset(0) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + // Simulate the Subscribe logic + // 1. Try disk read first + pos, done, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Disk read failed: %v", err) + } + if done { + t.Fatal("Should not be done after disk read") + } + + // 2. Read from memory + eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachLogFn(entry) + } + + _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn) + if err != nil && err != log_buffer.ResumeFromDiskError { + t.Fatalf("Memory read failed: %v", err) + } + + // Verify we got all offsets in order + expected := []int64{0, 1, 2, 3} + if len(receivedOffsets) != len(expected) { + t.Errorf("Expected %d offsets, got %d", len(expected), len(receivedOffsets)) + } + for i, offset := range receivedOffsets { + if offset != expected[i] { + t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset) + } + } + }) + + // Test reading from offset 2 + t.Run("ReadFromOffset2", func(t *testing.T) { + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePositionFromOffset(2) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachLogFn(entry) + } + + // Should skip disk and go straight to memory + pos, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Disk read failed: %v", err) + } + + _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn) + if err != nil && err != log_buffer.ResumeFromDiskError { + t.Fatalf("Memory read failed: %v", err) + } + + // Verify we got offsets 2, 3 + expected := []int64{2, 3} + if len(receivedOffsets) != len(expected) { + t.Errorf("Expected %d offsets, got %d", len(expected), len(receivedOffsets)) + } + for i, offset := range receivedOffsets { + if offset != expected[i] { + t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset) + } + } + }) +} + +// TestOffsetBasedSubscribe_DataOnDisk tests reading when data is on disk +func TestOffsetBasedSubscribe_DataOnDisk(t *testing.T) { + baseTime := time.Now() + + mock := &MockLogBuffer{ + // Offsets 0-9 on disk + diskEntries: []*filer_pb.LogEntry{ + createTestEntry(0, baseTime, "key0", "value0"), + createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"), + createTestEntry(2, baseTime.Add(2*time.Second), "key2", "value2"), + createTestEntry(3, baseTime.Add(3*time.Second), "key3", "value3"), + createTestEntry(4, baseTime.Add(4*time.Second), "key4", "value4"), + createTestEntry(5, baseTime.Add(5*time.Second), "key5", "value5"), + createTestEntry(6, baseTime.Add(6*time.Second), "key6", "value6"), + createTestEntry(7, baseTime.Add(7*time.Second), "key7", "value7"), + createTestEntry(8, baseTime.Add(8*time.Second), "key8", "value8"), + createTestEntry(9, baseTime.Add(9*time.Second), "key9", "value9"), + }, + diskStartOffset: 0, + diskStopOffset: 9, + // Offsets 10-12 in memory + memoryEntries: []*filer_pb.LogEntry{ + createTestEntry(10, baseTime.Add(10*time.Second), "key10", "value10"), + createTestEntry(11, baseTime.Add(11*time.Second), "key11", "value11"), + createTestEntry(12, baseTime.Add(12*time.Second), "key12", "value12"), + }, + memoryStartOffset: 10, + memoryStopOffset: 12, + } + + // Test reading from offset 0 (on disk) + t.Run("ReadFromOffset0_OnDisk", func(t *testing.T) { + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePositionFromOffset(0) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachLogFn(entry) + } + + // 1. Read from disk (should get 0-9) + pos, done, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Disk read failed: %v", err) + } + if done { + t.Fatal("Should not be done after disk read") + } + + // 2. Read from memory (should get 10-12) + _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn) + if err != nil && err != log_buffer.ResumeFromDiskError { + t.Fatalf("Memory read failed: %v", err) + } + + // Verify we got all offsets 0-12 in order + expected := []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} + if len(receivedOffsets) != len(expected) { + t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets) + } + for i, offset := range receivedOffsets { + if i < len(expected) && offset != expected[i] { + t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset) + } + } + }) + + // Test reading from offset 5 (on disk, middle) + t.Run("ReadFromOffset5_OnDisk", func(t *testing.T) { + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePositionFromOffset(5) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachLogFn(entry) + } + + // 1. Read from disk (should get 5-9) + pos, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Disk read failed: %v", err) + } + + // 2. Read from memory (should get 10-12) + _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn) + if err != nil && err != log_buffer.ResumeFromDiskError { + t.Fatalf("Memory read failed: %v", err) + } + + // Verify we got offsets 5-12 + expected := []int64{5, 6, 7, 8, 9, 10, 11, 12} + if len(receivedOffsets) != len(expected) { + t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets) + } + for i, offset := range receivedOffsets { + if i < len(expected) && offset != expected[i] { + t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset) + } + } + }) + + // Test reading from offset 11 (in memory) + t.Run("ReadFromOffset11_InMemory", func(t *testing.T) { + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePositionFromOffset(11) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachLogFn(entry) + } + + // 1. Try disk read (should get nothing) + pos, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Disk read failed: %v", err) + } + + // 2. Read from memory (should get 11-12) + _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn) + if err != nil && err != log_buffer.ResumeFromDiskError { + t.Fatalf("Memory read failed: %v", err) + } + + // Verify we got offsets 11-12 + expected := []int64{11, 12} + if len(receivedOffsets) != len(expected) { + t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets) + } + for i, offset := range receivedOffsets { + if i < len(expected) && offset != expected[i] { + t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset) + } + } + }) +} + +// TestTimestampBasedSubscribe tests timestamp-based reading +func TestTimestampBasedSubscribe(t *testing.T) { + baseTime := time.Now() + + mock := &MockLogBuffer{ + diskEntries: []*filer_pb.LogEntry{ + createTestEntry(0, baseTime, "key0", "value0"), + createTestEntry(1, baseTime.Add(10*time.Second), "key1", "value1"), + createTestEntry(2, baseTime.Add(20*time.Second), "key2", "value2"), + }, + memoryEntries: []*filer_pb.LogEntry{ + createTestEntry(3, baseTime.Add(30*time.Second), "key3", "value3"), + createTestEntry(4, baseTime.Add(40*time.Second), "key4", "value4"), + }, + } + + // Test reading from beginning + t.Run("ReadFromBeginning", func(t *testing.T) { + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePosition(baseTime.UnixNano(), -1) // Timestamp-based + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + // Read from disk + _, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Disk read failed: %v", err) + } + + // In real scenario, would then read from memory using LoopProcessLogData + // For this test, just verify disk gave us 0-2 + expected := []int64{0, 1, 2} + if len(receivedOffsets) != len(expected) { + t.Errorf("Expected %d offsets, got %d", len(expected), len(receivedOffsets)) + } + }) + + // Test reading from middle timestamp + t.Run("ReadFromMiddleTimestamp", func(t *testing.T) { + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePosition(baseTime.Add(15*time.Second).UnixNano(), -1) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + // Read from disk + _, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Disk read failed: %v", err) + } + + // Should get offset 2 only (timestamp at 20s >= 15s, offset 1 at 10s is excluded) + expected := []int64{2} + if len(receivedOffsets) != len(expected) { + t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets) + } + }) +} + +// TestConcurrentSubscribers tests multiple concurrent subscribers +func TestConcurrentSubscribers(t *testing.T) { + baseTime := time.Now() + + mock := &MockLogBuffer{ + diskEntries: []*filer_pb.LogEntry{ + createTestEntry(0, baseTime, "key0", "value0"), + createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"), + createTestEntry(2, baseTime.Add(2*time.Second), "key2", "value2"), + }, + memoryEntries: []*filer_pb.LogEntry{ + createTestEntry(3, baseTime.Add(3*time.Second), "key3", "value3"), + createTestEntry(4, baseTime.Add(4*time.Second), "key4", "value4"), + }, + memoryStartOffset: 3, + memoryStopOffset: 4, + } + + var wg sync.WaitGroup + results := make(map[string][]int64) + var mu sync.Mutex + + // Spawn 3 concurrent subscribers + for i := 0; i < 3; i++ { + wg.Add(1) + subscriberName := fmt.Sprintf("subscriber-%d", i) + + go func(name string) { + defer wg.Done() + + var receivedOffsets []int64 + startPos := log_buffer.NewMessagePositionFromOffset(0) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + receivedOffsets = append(receivedOffsets, entry.Offset) + return false, nil + } + + eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachLogFn(entry) + } + + // Read from disk + pos, _, _ := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + + // Read from memory + mock.MockLoopProcessLogDataWithOffset(name, pos, 0, func() bool { return true }, eachLogWithOffsetFn) + + mu.Lock() + results[name] = receivedOffsets + mu.Unlock() + }(subscriberName) + } + + wg.Wait() + + // Verify all subscribers got the same data + expected := []int64{0, 1, 2, 3, 4} + for name, offsets := range results { + if len(offsets) != len(expected) { + t.Errorf("%s: Expected %d offsets, got %d", name, len(expected), len(offsets)) + continue + } + for i, offset := range offsets { + if offset != expected[i] { + t.Errorf("%s: Offset[%d]: expected %d, got %d", name, i, expected[i], offset) + } + } + } +} + +// TestResumeFromDiskError tests handling of ResumeFromDiskError +func TestResumeFromDiskError(t *testing.T) { + baseTime := time.Now() + + mock := &MockLogBuffer{ + diskEntries: []*filer_pb.LogEntry{ + createTestEntry(0, baseTime, "key0", "value0"), + createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"), + }, + memoryEntries: []*filer_pb.LogEntry{ + createTestEntry(10, baseTime.Add(10*time.Second), "key10", "value10"), + }, + memoryStartOffset: 10, + memoryStopOffset: 10, + } + + // Try to read offset 5, which is between disk (0-1) and memory (10) + // This should trigger ResumeFromDiskError from memory read + startPos := log_buffer.NewMessagePositionFromOffset(5) + + eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) { + return false, nil + } + + eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachLogFn(entry) + } + + // Disk read should return no data (offset 5 > disk end) + _, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn) + if err != nil { + t.Fatalf("Unexpected disk read error: %v", err) + } + + // Memory read should return ResumeFromDiskError (offset 5 < memory start) + _, _, err = mock.MockLoopProcessLogDataWithOffset("test", startPos, 0, func() bool { return true }, eachLogWithOffsetFn) + if err != log_buffer.ResumeFromDiskError { + t.Errorf("Expected ResumeFromDiskError, got: %v", err) + } +} |
