aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_offset_integration_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_offset_integration_test.go')
-rw-r--r--weed/mq/broker/broker_offset_integration_test.go351
1 files changed, 351 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_offset_integration_test.go b/weed/mq/broker/broker_offset_integration_test.go
new file mode 100644
index 000000000..49df58a64
--- /dev/null
+++ b/weed/mq/broker/broker_offset_integration_test.go
@@ -0,0 +1,351 @@
+package broker
+
+import (
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func createTestTopic() topic.Topic {
+ return topic.Topic{
+ Namespace: "test",
+ Name: "offset-test",
+ }
+}
+
+func createTestPartition() topic.Partition {
+ return topic.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+}
+
+func TestBrokerOffsetManager_AssignOffset(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Test sequential offset assignment
+ for i := int64(0); i < 10; i++ {
+ assignedOffset, err := manager.AssignOffset(testTopic, testPartition)
+ if err != nil {
+ t.Fatalf("Failed to assign offset %d: %v", i, err)
+ }
+
+ if assignedOffset != i {
+ t.Errorf("Expected offset %d, got %d", i, assignedOffset)
+ }
+ }
+}
+
+func TestBrokerOffsetManager_AssignBatchOffsets(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Assign batch of offsets
+ baseOffset, lastOffset, err := manager.AssignBatchOffsets(testTopic, testPartition, 5)
+ if err != nil {
+ t.Fatalf("Failed to assign batch offsets: %v", err)
+ }
+
+ if baseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", baseOffset)
+ }
+
+ if lastOffset != 4 {
+ t.Errorf("Expected last offset 4, got %d", lastOffset)
+ }
+
+ // Assign another batch
+ baseOffset2, lastOffset2, err := manager.AssignBatchOffsets(testTopic, testPartition, 3)
+ if err != nil {
+ t.Fatalf("Failed to assign second batch offsets: %v", err)
+ }
+
+ if baseOffset2 != 5 {
+ t.Errorf("Expected base offset 5, got %d", baseOffset2)
+ }
+
+ if lastOffset2 != 7 {
+ t.Errorf("Expected last offset 7, got %d", lastOffset2)
+ }
+}
+
+func TestBrokerOffsetManager_GetHighWaterMark(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Initially should be 0
+ hwm, err := manager.GetHighWaterMark(testTopic, testPartition)
+ if err != nil {
+ t.Fatalf("Failed to get initial high water mark: %v", err)
+ }
+
+ if hwm != 0 {
+ t.Errorf("Expected initial high water mark 0, got %d", hwm)
+ }
+
+ // Assign some offsets
+ manager.AssignBatchOffsets(testTopic, testPartition, 10)
+
+ // High water mark should be updated
+ hwm, err = manager.GetHighWaterMark(testTopic, testPartition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark after assignment: %v", err)
+ }
+
+ if hwm != 10 {
+ t.Errorf("Expected high water mark 10, got %d", hwm)
+ }
+}
+
+func TestBrokerOffsetManager_CreateSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Assign some offsets first
+ manager.AssignBatchOffsets(testTopic, testPartition, 5)
+
+ // Create subscription
+ sub, err := manager.CreateSubscription(
+ "test-sub",
+ testTopic,
+ testPartition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ if sub.ID != "test-sub" {
+ t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
+ }
+
+ if sub.StartOffset != 0 {
+ t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
+ }
+}
+
+func TestBrokerOffsetManager_GetPartitionOffsetInfo(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Test empty partition
+ info, err := manager.GetPartitionOffsetInfo(testTopic, testPartition)
+ if err != nil {
+ t.Fatalf("Failed to get partition offset info: %v", err)
+ }
+
+ if info.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
+ }
+
+ if info.LatestOffset != -1 {
+ t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
+ }
+
+ // Assign offsets and test again
+ manager.AssignBatchOffsets(testTopic, testPartition, 5)
+
+ info, err = manager.GetPartitionOffsetInfo(testTopic, testPartition)
+ if err != nil {
+ t.Fatalf("Failed to get partition offset info after assignment: %v", err)
+ }
+
+ if info.LatestOffset != 4 {
+ t.Errorf("Expected latest offset 4, got %d", info.LatestOffset)
+ }
+
+ if info.HighWaterMark != 5 {
+ t.Errorf("Expected high water mark 5, got %d", info.HighWaterMark)
+ }
+}
+
+func TestBrokerOffsetManager_MultiplePartitions(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+
+ // Create different partitions
+ partition1 := topic.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ partition2 := topic.Partition{
+ RingSize: 1024,
+ RangeStart: 32,
+ RangeStop: 63,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ // Assign offsets to different partitions
+ assignedOffset1, err := manager.AssignOffset(testTopic, partition1)
+ if err != nil {
+ t.Fatalf("Failed to assign offset to partition1: %v", err)
+ }
+
+ assignedOffset2, err := manager.AssignOffset(testTopic, partition2)
+ if err != nil {
+ t.Fatalf("Failed to assign offset to partition2: %v", err)
+ }
+
+ // Both should start at 0
+ if assignedOffset1 != 0 {
+ t.Errorf("Expected offset 0 for partition1, got %d", assignedOffset1)
+ }
+
+ if assignedOffset2 != 0 {
+ t.Errorf("Expected offset 0 for partition2, got %d", assignedOffset2)
+ }
+
+ // Assign more offsets to partition1
+ assignedOffset1_2, err := manager.AssignOffset(testTopic, partition1)
+ if err != nil {
+ t.Fatalf("Failed to assign second offset to partition1: %v", err)
+ }
+
+ if assignedOffset1_2 != 1 {
+ t.Errorf("Expected offset 1 for partition1, got %d", assignedOffset1_2)
+ }
+
+ // Partition2 should still be at 0 for next assignment
+ assignedOffset2_2, err := manager.AssignOffset(testTopic, partition2)
+ if err != nil {
+ t.Fatalf("Failed to assign second offset to partition2: %v", err)
+ }
+
+ if assignedOffset2_2 != 1 {
+ t.Errorf("Expected offset 1 for partition2, got %d", assignedOffset2_2)
+ }
+}
+
+func TestOffsetAwarePublisher(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Create a mock local partition (simplified for testing)
+ localPartition := &topic.LocalPartition{}
+
+ // Create offset assignment function
+ assignOffsetFn := func() (int64, error) {
+ return manager.AssignOffset(testTopic, testPartition)
+ }
+
+ // Create offset-aware publisher
+ publisher := topic.NewOffsetAwarePublisher(localPartition, assignOffsetFn)
+
+ if publisher.GetPartition() != localPartition {
+ t.Error("Publisher should return the correct partition")
+ }
+
+ // Test would require more setup to actually publish messages
+ // This tests the basic structure
+}
+
+func TestBrokerOffsetManager_GetOffsetMetrics(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Initial metrics
+ metrics := manager.GetOffsetMetrics()
+ if metrics.TotalOffsets != 0 {
+ t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
+ }
+
+ // Assign some offsets
+ manager.AssignBatchOffsets(testTopic, testPartition, 5)
+
+ // Create subscription
+ manager.CreateSubscription("test-sub", testTopic, testPartition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+
+ // Check updated metrics
+ metrics = manager.GetOffsetMetrics()
+ if metrics.PartitionCount != 1 {
+ t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
+ }
+}
+
+func TestBrokerOffsetManager_AssignOffsetsWithResult(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Assign offsets with result
+ result := manager.AssignOffsetsWithResult(testTopic, testPartition, 3)
+
+ if result.Error != nil {
+ t.Fatalf("Expected no error, got: %v", result.Error)
+ }
+
+ if result.BaseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", result.BaseOffset)
+ }
+
+ if result.LastOffset != 2 {
+ t.Errorf("Expected last offset 2, got %d", result.LastOffset)
+ }
+
+ if result.Count != 3 {
+ t.Errorf("Expected count 3, got %d", result.Count)
+ }
+
+ if result.Topic != testTopic {
+ t.Error("Topic mismatch in result")
+ }
+
+ if result.Partition != testPartition {
+ t.Error("Partition mismatch in result")
+ }
+
+ if result.Timestamp <= 0 {
+ t.Error("Timestamp should be set")
+ }
+}
+
+func TestBrokerOffsetManager_Shutdown(t *testing.T) {
+ storage := NewInMemoryOffsetStorageForTesting()
+ manager := NewBrokerOffsetManagerWithStorage(storage)
+ testTopic := createTestTopic()
+ testPartition := createTestPartition()
+
+ // Assign some offsets and create subscriptions
+ manager.AssignBatchOffsets(testTopic, testPartition, 5)
+ manager.CreateSubscription("test-sub", testTopic, testPartition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+
+ // Shutdown should not panic
+ manager.Shutdown()
+
+ // After shutdown, operations should still work (using new managers)
+ offset, err := manager.AssignOffset(testTopic, testPartition)
+ if err != nil {
+ t.Fatalf("Operations should still work after shutdown: %v", err)
+ }
+
+ // Should start from 0 again (new manager)
+ if offset != 0 {
+ t.Errorf("Expected offset 0 after shutdown, got %d", offset)
+ }
+}