aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition_subscribe_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition_subscribe_test.go')
-rw-r--r--weed/mq/topic/local_partition_subscribe_test.go566
1 files changed, 566 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition_subscribe_test.go b/weed/mq/topic/local_partition_subscribe_test.go
new file mode 100644
index 000000000..3f49432e5
--- /dev/null
+++ b/weed/mq/topic/local_partition_subscribe_test.go
@@ -0,0 +1,566 @@
+package topic
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+)
+
+// MockLogBuffer provides a controllable log buffer for testing
+type MockLogBuffer struct {
+ // In-memory data
+ memoryEntries []*filer_pb.LogEntry
+ memoryStartTime time.Time
+ memoryStopTime time.Time
+ memoryStartOffset int64
+ memoryStopOffset int64
+
+ // Disk data
+ diskEntries []*filer_pb.LogEntry
+ diskStartTime time.Time
+ diskStopTime time.Time
+ diskStartOffset int64
+ diskStopOffset int64
+
+ // Behavior control
+ diskReadDelay time.Duration
+ memoryReadDelay time.Duration
+ diskReadError error
+ memoryReadError error
+}
+
+// MockReadFromDiskFn simulates reading from disk
+func (m *MockLogBuffer) MockReadFromDiskFn(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (log_buffer.MessagePosition, bool, error) {
+ if m.diskReadDelay > 0 {
+ time.Sleep(m.diskReadDelay)
+ }
+
+ if m.diskReadError != nil {
+ return startPosition, false, m.diskReadError
+ }
+
+ isOffsetBased := startPosition.IsOffsetBased
+ lastPosition := startPosition
+ isDone := false
+
+ for _, entry := range m.diskEntries {
+ // Filter based on mode
+ if isOffsetBased {
+ if entry.Offset < startPosition.Offset {
+ continue
+ }
+ } else {
+ entryTime := time.Unix(0, entry.TsNs)
+ if entryTime.Before(startPosition.Time) {
+ continue
+ }
+ }
+
+ // Apply stopTsNs filter
+ if stopTsNs > 0 && entry.TsNs > stopTsNs {
+ isDone = true
+ break
+ }
+
+ // Call handler
+ done, err := eachLogEntryFn(entry)
+ if err != nil {
+ return lastPosition, false, err
+ }
+ if done {
+ isDone = true
+ break
+ }
+
+ // Update position
+ if isOffsetBased {
+ lastPosition = log_buffer.NewMessagePosition(entry.TsNs, entry.Offset+1)
+ } else {
+ lastPosition = log_buffer.NewMessagePosition(entry.TsNs, entry.Offset)
+ }
+ }
+
+ return lastPosition, isDone, nil
+}
+
+// MockLoopProcessLogDataWithOffset simulates reading from memory with offset
+func (m *MockLogBuffer) MockLoopProcessLogDataWithOffset(readerName string, startPosition log_buffer.MessagePosition, stopTsNs int64, waitForDataFn func() bool, eachLogDataFn log_buffer.EachLogEntryWithOffsetFuncType) (log_buffer.MessagePosition, bool, error) {
+ if m.memoryReadDelay > 0 {
+ time.Sleep(m.memoryReadDelay)
+ }
+
+ if m.memoryReadError != nil {
+ return startPosition, false, m.memoryReadError
+ }
+
+ lastPosition := startPosition
+ isDone := false
+
+ // Check if requested offset is in memory
+ if startPosition.Offset < m.memoryStartOffset {
+ // Data is on disk
+ return startPosition, false, log_buffer.ResumeFromDiskError
+ }
+
+ for _, entry := range m.memoryEntries {
+ // Filter by offset
+ if entry.Offset < startPosition.Offset {
+ continue
+ }
+
+ // Apply stopTsNs filter
+ if stopTsNs > 0 && entry.TsNs > stopTsNs {
+ isDone = true
+ break
+ }
+
+ // Call handler
+ done, err := eachLogDataFn(entry, entry.Offset)
+ if err != nil {
+ return lastPosition, false, err
+ }
+ if done {
+ isDone = true
+ break
+ }
+
+ // Update position
+ lastPosition = log_buffer.NewMessagePosition(entry.TsNs, entry.Offset+1)
+ }
+
+ return lastPosition, isDone, nil
+}
+
+// Helper to create test entries
+func createTestEntry(offset int64, timestamp time.Time, key, value string) *filer_pb.LogEntry {
+ return &filer_pb.LogEntry{
+ TsNs: timestamp.UnixNano(),
+ Offset: offset,
+ Key: []byte(key),
+ Data: []byte(value),
+ }
+}
+
+// TestOffsetBasedSubscribe_AllDataInMemory tests reading when all data is in memory
+func TestOffsetBasedSubscribe_AllDataInMemory(t *testing.T) {
+ baseTime := time.Now()
+
+ mock := &MockLogBuffer{
+ memoryEntries: []*filer_pb.LogEntry{
+ createTestEntry(0, baseTime, "key0", "value0"),
+ createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"),
+ createTestEntry(2, baseTime.Add(2*time.Second), "key2", "value2"),
+ createTestEntry(3, baseTime.Add(3*time.Second), "key3", "value3"),
+ },
+ memoryStartOffset: 0,
+ memoryStopOffset: 3,
+ diskEntries: []*filer_pb.LogEntry{}, // No disk data
+ }
+
+ // Test reading from offset 0
+ t.Run("ReadFromOffset0", func(t *testing.T) {
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePositionFromOffset(0)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ // Simulate the Subscribe logic
+ // 1. Try disk read first
+ pos, done, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Disk read failed: %v", err)
+ }
+ if done {
+ t.Fatal("Should not be done after disk read")
+ }
+
+ // 2. Read from memory
+ eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachLogFn(entry)
+ }
+
+ _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn)
+ if err != nil && err != log_buffer.ResumeFromDiskError {
+ t.Fatalf("Memory read failed: %v", err)
+ }
+
+ // Verify we got all offsets in order
+ expected := []int64{0, 1, 2, 3}
+ if len(receivedOffsets) != len(expected) {
+ t.Errorf("Expected %d offsets, got %d", len(expected), len(receivedOffsets))
+ }
+ for i, offset := range receivedOffsets {
+ if offset != expected[i] {
+ t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset)
+ }
+ }
+ })
+
+ // Test reading from offset 2
+ t.Run("ReadFromOffset2", func(t *testing.T) {
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePositionFromOffset(2)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachLogFn(entry)
+ }
+
+ // Should skip disk and go straight to memory
+ pos, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Disk read failed: %v", err)
+ }
+
+ _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn)
+ if err != nil && err != log_buffer.ResumeFromDiskError {
+ t.Fatalf("Memory read failed: %v", err)
+ }
+
+ // Verify we got offsets 2, 3
+ expected := []int64{2, 3}
+ if len(receivedOffsets) != len(expected) {
+ t.Errorf("Expected %d offsets, got %d", len(expected), len(receivedOffsets))
+ }
+ for i, offset := range receivedOffsets {
+ if offset != expected[i] {
+ t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset)
+ }
+ }
+ })
+}
+
+// TestOffsetBasedSubscribe_DataOnDisk tests reading when data is on disk
+func TestOffsetBasedSubscribe_DataOnDisk(t *testing.T) {
+ baseTime := time.Now()
+
+ mock := &MockLogBuffer{
+ // Offsets 0-9 on disk
+ diskEntries: []*filer_pb.LogEntry{
+ createTestEntry(0, baseTime, "key0", "value0"),
+ createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"),
+ createTestEntry(2, baseTime.Add(2*time.Second), "key2", "value2"),
+ createTestEntry(3, baseTime.Add(3*time.Second), "key3", "value3"),
+ createTestEntry(4, baseTime.Add(4*time.Second), "key4", "value4"),
+ createTestEntry(5, baseTime.Add(5*time.Second), "key5", "value5"),
+ createTestEntry(6, baseTime.Add(6*time.Second), "key6", "value6"),
+ createTestEntry(7, baseTime.Add(7*time.Second), "key7", "value7"),
+ createTestEntry(8, baseTime.Add(8*time.Second), "key8", "value8"),
+ createTestEntry(9, baseTime.Add(9*time.Second), "key9", "value9"),
+ },
+ diskStartOffset: 0,
+ diskStopOffset: 9,
+ // Offsets 10-12 in memory
+ memoryEntries: []*filer_pb.LogEntry{
+ createTestEntry(10, baseTime.Add(10*time.Second), "key10", "value10"),
+ createTestEntry(11, baseTime.Add(11*time.Second), "key11", "value11"),
+ createTestEntry(12, baseTime.Add(12*time.Second), "key12", "value12"),
+ },
+ memoryStartOffset: 10,
+ memoryStopOffset: 12,
+ }
+
+ // Test reading from offset 0 (on disk)
+ t.Run("ReadFromOffset0_OnDisk", func(t *testing.T) {
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePositionFromOffset(0)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachLogFn(entry)
+ }
+
+ // 1. Read from disk (should get 0-9)
+ pos, done, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Disk read failed: %v", err)
+ }
+ if done {
+ t.Fatal("Should not be done after disk read")
+ }
+
+ // 2. Read from memory (should get 10-12)
+ _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn)
+ if err != nil && err != log_buffer.ResumeFromDiskError {
+ t.Fatalf("Memory read failed: %v", err)
+ }
+
+ // Verify we got all offsets 0-12 in order
+ expected := []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}
+ if len(receivedOffsets) != len(expected) {
+ t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets)
+ }
+ for i, offset := range receivedOffsets {
+ if i < len(expected) && offset != expected[i] {
+ t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset)
+ }
+ }
+ })
+
+ // Test reading from offset 5 (on disk, middle)
+ t.Run("ReadFromOffset5_OnDisk", func(t *testing.T) {
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePositionFromOffset(5)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachLogFn(entry)
+ }
+
+ // 1. Read from disk (should get 5-9)
+ pos, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Disk read failed: %v", err)
+ }
+
+ // 2. Read from memory (should get 10-12)
+ _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn)
+ if err != nil && err != log_buffer.ResumeFromDiskError {
+ t.Fatalf("Memory read failed: %v", err)
+ }
+
+ // Verify we got offsets 5-12
+ expected := []int64{5, 6, 7, 8, 9, 10, 11, 12}
+ if len(receivedOffsets) != len(expected) {
+ t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets)
+ }
+ for i, offset := range receivedOffsets {
+ if i < len(expected) && offset != expected[i] {
+ t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset)
+ }
+ }
+ })
+
+ // Test reading from offset 11 (in memory)
+ t.Run("ReadFromOffset11_InMemory", func(t *testing.T) {
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePositionFromOffset(11)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachLogFn(entry)
+ }
+
+ // 1. Try disk read (should get nothing)
+ pos, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Disk read failed: %v", err)
+ }
+
+ // 2. Read from memory (should get 11-12)
+ _, _, err = mock.MockLoopProcessLogDataWithOffset("test", pos, 0, func() bool { return true }, eachLogWithOffsetFn)
+ if err != nil && err != log_buffer.ResumeFromDiskError {
+ t.Fatalf("Memory read failed: %v", err)
+ }
+
+ // Verify we got offsets 11-12
+ expected := []int64{11, 12}
+ if len(receivedOffsets) != len(expected) {
+ t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets)
+ }
+ for i, offset := range receivedOffsets {
+ if i < len(expected) && offset != expected[i] {
+ t.Errorf("Offset[%d]: expected %d, got %d", i, expected[i], offset)
+ }
+ }
+ })
+}
+
+// TestTimestampBasedSubscribe tests timestamp-based reading
+func TestTimestampBasedSubscribe(t *testing.T) {
+ baseTime := time.Now()
+
+ mock := &MockLogBuffer{
+ diskEntries: []*filer_pb.LogEntry{
+ createTestEntry(0, baseTime, "key0", "value0"),
+ createTestEntry(1, baseTime.Add(10*time.Second), "key1", "value1"),
+ createTestEntry(2, baseTime.Add(20*time.Second), "key2", "value2"),
+ },
+ memoryEntries: []*filer_pb.LogEntry{
+ createTestEntry(3, baseTime.Add(30*time.Second), "key3", "value3"),
+ createTestEntry(4, baseTime.Add(40*time.Second), "key4", "value4"),
+ },
+ }
+
+ // Test reading from beginning
+ t.Run("ReadFromBeginning", func(t *testing.T) {
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePosition(baseTime.UnixNano(), -1) // Timestamp-based
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ // Read from disk
+ _, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Disk read failed: %v", err)
+ }
+
+ // In real scenario, would then read from memory using LoopProcessLogData
+ // For this test, just verify disk gave us 0-2
+ expected := []int64{0, 1, 2}
+ if len(receivedOffsets) != len(expected) {
+ t.Errorf("Expected %d offsets, got %d", len(expected), len(receivedOffsets))
+ }
+ })
+
+ // Test reading from middle timestamp
+ t.Run("ReadFromMiddleTimestamp", func(t *testing.T) {
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePosition(baseTime.Add(15*time.Second).UnixNano(), -1)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ // Read from disk
+ _, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Disk read failed: %v", err)
+ }
+
+ // Should get offset 2 only (timestamp at 20s >= 15s, offset 1 at 10s is excluded)
+ expected := []int64{2}
+ if len(receivedOffsets) != len(expected) {
+ t.Errorf("Expected %d offsets, got %d: %v", len(expected), len(receivedOffsets), receivedOffsets)
+ }
+ })
+}
+
+// TestConcurrentSubscribers tests multiple concurrent subscribers
+func TestConcurrentSubscribers(t *testing.T) {
+ baseTime := time.Now()
+
+ mock := &MockLogBuffer{
+ diskEntries: []*filer_pb.LogEntry{
+ createTestEntry(0, baseTime, "key0", "value0"),
+ createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"),
+ createTestEntry(2, baseTime.Add(2*time.Second), "key2", "value2"),
+ },
+ memoryEntries: []*filer_pb.LogEntry{
+ createTestEntry(3, baseTime.Add(3*time.Second), "key3", "value3"),
+ createTestEntry(4, baseTime.Add(4*time.Second), "key4", "value4"),
+ },
+ memoryStartOffset: 3,
+ memoryStopOffset: 4,
+ }
+
+ var wg sync.WaitGroup
+ results := make(map[string][]int64)
+ var mu sync.Mutex
+
+ // Spawn 3 concurrent subscribers
+ for i := 0; i < 3; i++ {
+ wg.Add(1)
+ subscriberName := fmt.Sprintf("subscriber-%d", i)
+
+ go func(name string) {
+ defer wg.Done()
+
+ var receivedOffsets []int64
+ startPos := log_buffer.NewMessagePositionFromOffset(0)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ receivedOffsets = append(receivedOffsets, entry.Offset)
+ return false, nil
+ }
+
+ eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachLogFn(entry)
+ }
+
+ // Read from disk
+ pos, _, _ := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+
+ // Read from memory
+ mock.MockLoopProcessLogDataWithOffset(name, pos, 0, func() bool { return true }, eachLogWithOffsetFn)
+
+ mu.Lock()
+ results[name] = receivedOffsets
+ mu.Unlock()
+ }(subscriberName)
+ }
+
+ wg.Wait()
+
+ // Verify all subscribers got the same data
+ expected := []int64{0, 1, 2, 3, 4}
+ for name, offsets := range results {
+ if len(offsets) != len(expected) {
+ t.Errorf("%s: Expected %d offsets, got %d", name, len(expected), len(offsets))
+ continue
+ }
+ for i, offset := range offsets {
+ if offset != expected[i] {
+ t.Errorf("%s: Offset[%d]: expected %d, got %d", name, i, expected[i], offset)
+ }
+ }
+ }
+}
+
+// TestResumeFromDiskError tests handling of ResumeFromDiskError
+func TestResumeFromDiskError(t *testing.T) {
+ baseTime := time.Now()
+
+ mock := &MockLogBuffer{
+ diskEntries: []*filer_pb.LogEntry{
+ createTestEntry(0, baseTime, "key0", "value0"),
+ createTestEntry(1, baseTime.Add(1*time.Second), "key1", "value1"),
+ },
+ memoryEntries: []*filer_pb.LogEntry{
+ createTestEntry(10, baseTime.Add(10*time.Second), "key10", "value10"),
+ },
+ memoryStartOffset: 10,
+ memoryStopOffset: 10,
+ }
+
+ // Try to read offset 5, which is between disk (0-1) and memory (10)
+ // This should trigger ResumeFromDiskError from memory read
+ startPos := log_buffer.NewMessagePositionFromOffset(5)
+
+ eachLogFn := func(entry *filer_pb.LogEntry) (bool, error) {
+ return false, nil
+ }
+
+ eachLogWithOffsetFn := func(entry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachLogFn(entry)
+ }
+
+ // Disk read should return no data (offset 5 > disk end)
+ _, _, err := mock.MockReadFromDiskFn(startPos, 0, eachLogFn)
+ if err != nil {
+ t.Fatalf("Unexpected disk read error: %v", err)
+ }
+
+ // Memory read should return ResumeFromDiskError (offset 5 < memory start)
+ _, _, err = mock.MockLoopProcessLogDataWithOffset("test", startPos, 0, func() bool { return true }, eachLogWithOffsetFn)
+ if err != log_buffer.ResumeFromDiskError {
+ t.Errorf("Expected ResumeFromDiskError, got: %v", err)
+ }
+}