diff options
Diffstat (limited to 'weed/util/log_buffer/log_read_integration_test.go')
| -rw-r--r-- | weed/util/log_buffer/log_read_integration_test.go | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_read_integration_test.go b/weed/util/log_buffer/log_read_integration_test.go new file mode 100644 index 000000000..38549b9f7 --- /dev/null +++ b/weed/util/log_buffer/log_read_integration_test.go @@ -0,0 +1,353 @@ +package log_buffer + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// TestConcurrentProducerConsumer simulates the integration test scenario: +// - One producer writing messages continuously +// - Multiple consumers reading from different offsets +// - Consumers reading sequentially (like Kafka consumers) +func TestConcurrentProducerConsumer(t *testing.T) { + lb := NewLogBuffer("integration-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 1000 + const numConsumers = 2 + const messagesPerConsumer = numMessages / numConsumers + + // Start producer + producerDone := make(chan bool) + go func() { + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + time.Sleep(1 * time.Millisecond) // Simulate production rate + } + producerDone <- true + }() + + // Start consumers + consumerWg := sync.WaitGroup{} + consumerErrors := make(chan error, numConsumers) + consumedCounts := make([]int64, numConsumers) + + for consumerID := 0; consumerID < numConsumers; consumerID++ { + consumerWg.Add(1) + go func(id int, startOffset int64, endOffset int64) { + defer consumerWg.Done() + + currentOffset := startOffset + for currentOffset < endOffset { + // Read 10 messages at a time (like integration test) + messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) + if err != nil { + consumerErrors <- err + return + } + + if len(messages) == 0 { + // No data yet, wait a bit + time.Sleep(5 * time.Millisecond) + continue + } + + // Count only messages in this consumer's assigned range + messagesInRange := 0 + for i, msg := range messages { + if msg.Offset >= startOffset && msg.Offset < endOffset { + messagesInRange++ + expectedOffset := currentOffset + int64(i) + if msg.Offset != expectedOffset { + t.Errorf("Consumer %d: Expected offset %d, got %d", id, expectedOffset, msg.Offset) + } + } + } + + atomic.AddInt64(&consumedCounts[id], int64(messagesInRange)) + currentOffset = nextOffset + } + }(consumerID, int64(consumerID*messagesPerConsumer), int64((consumerID+1)*messagesPerConsumer)) + } + + // Wait for producer to finish + <-producerDone + + // Wait for consumers (with timeout) + done := make(chan bool) + go func() { + consumerWg.Wait() + done <- true + }() + + select { + case <-done: + // Success + case err := <-consumerErrors: + t.Fatalf("Consumer error: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for consumers to finish") + } + + // Verify all messages were consumed + totalConsumed := int64(0) + for i, count := range consumedCounts { + t.Logf("Consumer %d consumed %d messages", i, count) + totalConsumed += count + } + + if totalConsumed != numMessages { + t.Errorf("Expected to consume %d messages, but consumed %d", numMessages, totalConsumed) + } +} + +// TestBackwardSeeksWhileProducing simulates consumer rebalancing where +// consumers seek backward to earlier offsets while producer is still writing +func TestBackwardSeeksWhileProducing(t *testing.T) { + lb := NewLogBuffer("backward-seek-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 500 + const numSeeks = 10 + + // Start producer + producerDone := make(chan bool) + go func() { + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + time.Sleep(1 * time.Millisecond) + } + producerDone <- true + }() + + // Consumer that seeks backward periodically + consumerDone := make(chan bool) + readOffsets := make(map[int64]int) // Track how many times each offset was read + + go func() { + currentOffset := int64(0) + seeksRemaining := numSeeks + + for currentOffset < numMessages { + // Read some messages + messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) + if err != nil { + // For stateless reads, "offset out of range" means data not in memory yet + // This is expected when reading historical data or before production starts + time.Sleep(5 * time.Millisecond) + continue + } + + if len(messages) == 0 { + // No data available yet or caught up to producer + if !endOfPartition { + // Data might be coming, wait + time.Sleep(5 * time.Millisecond) + } else { + // At end of partition, wait for more production + time.Sleep(5 * time.Millisecond) + } + continue + } + + // Track read offsets + for _, msg := range messages { + readOffsets[msg.Offset]++ + } + + // Periodically seek backward (simulating rebalancing) + if seeksRemaining > 0 && nextOffset > 50 && nextOffset%100 == 0 { + seekOffset := nextOffset - 20 + t.Logf("Seeking backward from %d to %d", nextOffset, seekOffset) + currentOffset = seekOffset + seeksRemaining-- + } else { + currentOffset = nextOffset + } + } + + consumerDone <- true + }() + + // Wait for both + <-producerDone + <-consumerDone + + // Verify each offset was read at least once + for i := int64(0); i < numMessages; i++ { + if readOffsets[i] == 0 { + t.Errorf("Offset %d was never read", i) + } + } + + t.Logf("Total unique offsets read: %d out of %d", len(readOffsets), numMessages) +} + +// TestHighConcurrencyReads simulates multiple consumers reading from +// different offsets simultaneously (stress test) +func TestHighConcurrencyReads(t *testing.T) { + lb := NewLogBuffer("high-concurrency-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 1000 + const numReaders = 10 + + // Pre-populate buffer + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Start many concurrent readers at different offsets + wg := sync.WaitGroup{} + errors := make(chan error, numReaders) + + for reader := 0; reader < numReaders; reader++ { + wg.Add(1) + go func(startOffset int64) { + defer wg.Done() + + // Read 100 messages from this offset + currentOffset := startOffset + readCount := 0 + + for readCount < 100 && currentOffset < numMessages { + messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) + if err != nil { + errors <- err + return + } + + // Verify offsets are sequential + for i, msg := range messages { + expected := currentOffset + int64(i) + if msg.Offset != expected { + t.Errorf("Reader at %d: expected offset %d, got %d", startOffset, expected, msg.Offset) + } + } + + readCount += len(messages) + currentOffset = nextOffset + } + }(int64(reader * 10)) + } + + // Wait with timeout + done := make(chan bool) + go func() { + wg.Wait() + done <- true + }() + + select { + case <-done: + // Success + case err := <-errors: + t.Fatalf("Reader error: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for readers") + } +} + +// TestRepeatedReadsAtSameOffset simulates what happens when Kafka +// consumer re-fetches the same offset multiple times (due to timeouts or retries) +func TestRepeatedReadsAtSameOffset(t *testing.T) { + lb := NewLogBuffer("repeated-reads-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + + const numMessages = 100 + + // Pre-populate buffer + for i := 0; i < numMessages; i++ { + entry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: []byte("value"), + Offset: int64(i), + } + lb.AddLogEntryToBuffer(entry) + } + + // Read the same offset multiple times concurrently + const numReads = 10 + const testOffset = int64(50) + + wg := sync.WaitGroup{} + results := make([][]*filer_pb.LogEntry, numReads) + + for i := 0; i < numReads; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + messages, _, _, _, err := lb.ReadMessagesAtOffset(testOffset, 10, 10240) + if err != nil { + t.Errorf("Read %d error: %v", idx, err) + return + } + results[idx] = messages + }(i) + } + + wg.Wait() + + // Verify all reads returned the same data + firstRead := results[0] + for i := 1; i < numReads; i++ { + if len(results[i]) != len(firstRead) { + t.Errorf("Read %d returned %d messages, expected %d", i, len(results[i]), len(firstRead)) + } + + for j := range results[i] { + if results[i][j].Offset != firstRead[j].Offset { + t.Errorf("Read %d message %d has offset %d, expected %d", + i, j, results[i][j].Offset, firstRead[j].Offset) + } + } + } +} + +// TestEmptyPartitionPolling simulates consumers polling empty partitions +// waiting for data (common in Kafka) +func TestEmptyPartitionPolling(t *testing.T) { + lb := NewLogBuffer("empty-partition-test", time.Hour, nil, nil, func() {}) + lb.hasOffsets = true + lb.bufferStartOffset = 0 + lb.offset = 0 + + // Try to read from empty partition + messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(0, 10, 10240) + + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if len(messages) != 0 { + t.Errorf("Expected 0 messages, got %d", len(messages)) + } + if nextOffset != 0 { + t.Errorf("Expected nextOffset=0, got %d", nextOffset) + } + if !endOfPartition { + t.Error("Expected endOfPartition=true for future offset") + } +} |
