diff options
Diffstat (limited to 'weed/mq/broker/broker_offset_integration_test.go')
| -rw-r--r-- | weed/mq/broker/broker_offset_integration_test.go | 351 |
1 files changed, 351 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_offset_integration_test.go b/weed/mq/broker/broker_offset_integration_test.go new file mode 100644 index 000000000..49df58a64 --- /dev/null +++ b/weed/mq/broker/broker_offset_integration_test.go @@ -0,0 +1,351 @@ +package broker + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func createTestTopic() topic.Topic { + return topic.Topic{ + Namespace: "test", + Name: "offset-test", + } +} + +func createTestPartition() topic.Partition { + return topic.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } +} + +func TestBrokerOffsetManager_AssignOffset(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Test sequential offset assignment + for i := int64(0); i < 10; i++ { + assignedOffset, err := manager.AssignOffset(testTopic, testPartition) + if err != nil { + t.Fatalf("Failed to assign offset %d: %v", i, err) + } + + if assignedOffset != i { + t.Errorf("Expected offset %d, got %d", i, assignedOffset) + } + } +} + +func TestBrokerOffsetManager_AssignBatchOffsets(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Assign batch of offsets + baseOffset, lastOffset, err := manager.AssignBatchOffsets(testTopic, testPartition, 5) + if err != nil { + t.Fatalf("Failed to assign batch offsets: %v", err) + } + + if baseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", baseOffset) + } + + if lastOffset != 4 { + t.Errorf("Expected last offset 4, got %d", lastOffset) + } + + // Assign another batch + baseOffset2, lastOffset2, err := manager.AssignBatchOffsets(testTopic, testPartition, 3) + if err != nil { + t.Fatalf("Failed to assign second batch offsets: %v", err) + } + + if baseOffset2 != 5 { + t.Errorf("Expected base offset 5, got %d", baseOffset2) + } + + if lastOffset2 != 7 { + t.Errorf("Expected last offset 7, got %d", lastOffset2) + } +} + +func TestBrokerOffsetManager_GetHighWaterMark(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Initially should be 0 + hwm, err := manager.GetHighWaterMark(testTopic, testPartition) + if err != nil { + t.Fatalf("Failed to get initial high water mark: %v", err) + } + + if hwm != 0 { + t.Errorf("Expected initial high water mark 0, got %d", hwm) + } + + // Assign some offsets + manager.AssignBatchOffsets(testTopic, testPartition, 10) + + // High water mark should be updated + hwm, err = manager.GetHighWaterMark(testTopic, testPartition) + if err != nil { + t.Fatalf("Failed to get high water mark after assignment: %v", err) + } + + if hwm != 10 { + t.Errorf("Expected high water mark 10, got %d", hwm) + } +} + +func TestBrokerOffsetManager_CreateSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Assign some offsets first + manager.AssignBatchOffsets(testTopic, testPartition, 5) + + // Create subscription + sub, err := manager.CreateSubscription( + "test-sub", + testTopic, + testPartition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + if sub.ID != "test-sub" { + t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID) + } + + if sub.StartOffset != 0 { + t.Errorf("Expected start offset 0, got %d", sub.StartOffset) + } +} + +func TestBrokerOffsetManager_GetPartitionOffsetInfo(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Test empty partition + info, err := manager.GetPartitionOffsetInfo(testTopic, testPartition) + if err != nil { + t.Fatalf("Failed to get partition offset info: %v", err) + } + + if info.EarliestOffset != 0 { + t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) + } + + if info.LatestOffset != -1 { + t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset) + } + + // Assign offsets and test again + manager.AssignBatchOffsets(testTopic, testPartition, 5) + + info, err = manager.GetPartitionOffsetInfo(testTopic, testPartition) + if err != nil { + t.Fatalf("Failed to get partition offset info after assignment: %v", err) + } + + if info.LatestOffset != 4 { + t.Errorf("Expected latest offset 4, got %d", info.LatestOffset) + } + + if info.HighWaterMark != 5 { + t.Errorf("Expected high water mark 5, got %d", info.HighWaterMark) + } +} + +func TestBrokerOffsetManager_MultiplePartitions(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + + // Create different partitions + partition1 := topic.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + partition2 := topic.Partition{ + RingSize: 1024, + RangeStart: 32, + RangeStop: 63, + UnixTimeNs: time.Now().UnixNano(), + } + + // Assign offsets to different partitions + assignedOffset1, err := manager.AssignOffset(testTopic, partition1) + if err != nil { + t.Fatalf("Failed to assign offset to partition1: %v", err) + } + + assignedOffset2, err := manager.AssignOffset(testTopic, partition2) + if err != nil { + t.Fatalf("Failed to assign offset to partition2: %v", err) + } + + // Both should start at 0 + if assignedOffset1 != 0 { + t.Errorf("Expected offset 0 for partition1, got %d", assignedOffset1) + } + + if assignedOffset2 != 0 { + t.Errorf("Expected offset 0 for partition2, got %d", assignedOffset2) + } + + // Assign more offsets to partition1 + assignedOffset1_2, err := manager.AssignOffset(testTopic, partition1) + if err != nil { + t.Fatalf("Failed to assign second offset to partition1: %v", err) + } + + if assignedOffset1_2 != 1 { + t.Errorf("Expected offset 1 for partition1, got %d", assignedOffset1_2) + } + + // Partition2 should still be at 0 for next assignment + assignedOffset2_2, err := manager.AssignOffset(testTopic, partition2) + if err != nil { + t.Fatalf("Failed to assign second offset to partition2: %v", err) + } + + if assignedOffset2_2 != 1 { + t.Errorf("Expected offset 1 for partition2, got %d", assignedOffset2_2) + } +} + +func TestOffsetAwarePublisher(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Create a mock local partition (simplified for testing) + localPartition := &topic.LocalPartition{} + + // Create offset assignment function + assignOffsetFn := func() (int64, error) { + return manager.AssignOffset(testTopic, testPartition) + } + + // Create offset-aware publisher + publisher := topic.NewOffsetAwarePublisher(localPartition, assignOffsetFn) + + if publisher.GetPartition() != localPartition { + t.Error("Publisher should return the correct partition") + } + + // Test would require more setup to actually publish messages + // This tests the basic structure +} + +func TestBrokerOffsetManager_GetOffsetMetrics(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Initial metrics + metrics := manager.GetOffsetMetrics() + if metrics.TotalOffsets != 0 { + t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets) + } + + // Assign some offsets + manager.AssignBatchOffsets(testTopic, testPartition, 5) + + // Create subscription + manager.CreateSubscription("test-sub", testTopic, testPartition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + + // Check updated metrics + metrics = manager.GetOffsetMetrics() + if metrics.PartitionCount != 1 { + t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount) + } +} + +func TestBrokerOffsetManager_AssignOffsetsWithResult(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Assign offsets with result + result := manager.AssignOffsetsWithResult(testTopic, testPartition, 3) + + if result.Error != nil { + t.Fatalf("Expected no error, got: %v", result.Error) + } + + if result.BaseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", result.BaseOffset) + } + + if result.LastOffset != 2 { + t.Errorf("Expected last offset 2, got %d", result.LastOffset) + } + + if result.Count != 3 { + t.Errorf("Expected count 3, got %d", result.Count) + } + + if result.Topic != testTopic { + t.Error("Topic mismatch in result") + } + + if result.Partition != testPartition { + t.Error("Partition mismatch in result") + } + + if result.Timestamp <= 0 { + t.Error("Timestamp should be set") + } +} + +func TestBrokerOffsetManager_Shutdown(t *testing.T) { + storage := NewInMemoryOffsetStorageForTesting() + manager := NewBrokerOffsetManagerWithStorage(storage) + testTopic := createTestTopic() + testPartition := createTestPartition() + + // Assign some offsets and create subscriptions + manager.AssignBatchOffsets(testTopic, testPartition, 5) + manager.CreateSubscription("test-sub", testTopic, testPartition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + + // Shutdown should not panic + manager.Shutdown() + + // After shutdown, operations should still work (using new managers) + offset, err := manager.AssignOffset(testTopic, testPartition) + if err != nil { + t.Fatalf("Operations should still work after shutdown: %v", err) + } + + // Should start from 0 again (new manager) + if offset != 0 { + t.Errorf("Expected offset 0 after shutdown, got %d", offset) + } +} |
