aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/integration_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset/integration_test.go')
-rw-r--r--weed/mq/offset/integration_test.go544
1 files changed, 544 insertions, 0 deletions
diff --git a/weed/mq/offset/integration_test.go b/weed/mq/offset/integration_test.go
new file mode 100644
index 000000000..35299be65
--- /dev/null
+++ b/weed/mq/offset/integration_test.go
@@ -0,0 +1,544 @@
+package offset
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func TestSMQOffsetIntegration_PublishRecord(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish a single record
+ response, err := integration.PublishRecord(
+ "test-namespace", "test-topic",
+ partition,
+ []byte("test-key"),
+ &schema_pb.RecordValue{},
+ )
+
+ if err != nil {
+ t.Fatalf("Failed to publish record: %v", err)
+ }
+
+ if response.Error != "" {
+ t.Errorf("Expected no error, got: %s", response.Error)
+ }
+
+ if response.BaseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
+ }
+
+ if response.LastOffset != 0 {
+ t.Errorf("Expected last offset 0, got %d", response.LastOffset)
+ }
+}
+
+func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Create batch of records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+
+ // Publish batch
+ response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+ if err != nil {
+ t.Fatalf("Failed to publish record batch: %v", err)
+ }
+
+ if response.Error != "" {
+ t.Errorf("Expected no error, got: %s", response.Error)
+ }
+
+ if response.BaseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
+ }
+
+ if response.LastOffset != 2 {
+ t.Errorf("Expected last offset 2, got %d", response.LastOffset)
+ }
+
+ // Verify high water mark
+ hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark: %v", err)
+ }
+
+ if hwm != 3 {
+ t.Errorf("Expected high water mark 3, got %d", hwm)
+ }
+}
+
+func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish empty batch
+ response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, []PublishRecordRequest{})
+ if err != nil {
+ t.Fatalf("Failed to publish empty batch: %v", err)
+ }
+
+ if response.Error == "" {
+ t.Error("Expected error for empty batch")
+ }
+}
+
+func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish some records first
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ sub, err := integration.CreateSubscription(
+ "test-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ if sub.ID != "test-sub" {
+ t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
+ }
+
+ if sub.StartOffset != 0 {
+ t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
+ }
+}
+
+func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish some records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ sub, err := integration.CreateSubscription(
+ "test-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Subscribe to records
+ responses, err := integration.SubscribeRecords(sub, 2)
+ if err != nil {
+ t.Fatalf("Failed to subscribe to records: %v", err)
+ }
+
+ if len(responses) != 2 {
+ t.Errorf("Expected 2 responses, got %d", len(responses))
+ }
+
+ // Check offset progression
+ if responses[0].Offset != 0 {
+ t.Errorf("Expected first record offset 0, got %d", responses[0].Offset)
+ }
+
+ if responses[1].Offset != 1 {
+ t.Errorf("Expected second record offset 1, got %d", responses[1].Offset)
+ }
+
+ // Check subscription advancement
+ if sub.CurrentOffset != 2 {
+ t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset)
+ }
+}
+
+func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Create subscription on empty partition
+ sub, err := integration.CreateSubscription(
+ "empty-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Subscribe to records (should return empty)
+ responses, err := integration.SubscribeRecords(sub, 10)
+ if err != nil {
+ t.Fatalf("Failed to subscribe to empty partition: %v", err)
+ }
+
+ if len(responses) != 0 {
+ t.Errorf("Expected 0 responses from empty partition, got %d", len(responses))
+ }
+}
+
+func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key4"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key5"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ sub, err := integration.CreateSubscription(
+ "seek-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Seek to offset 3
+ err = integration.SeekSubscription("seek-sub", 3)
+ if err != nil {
+ t.Fatalf("Failed to seek subscription: %v", err)
+ }
+
+ if sub.CurrentOffset != 3 {
+ t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset)
+ }
+
+ // Subscribe from new position
+ responses, err := integration.SubscribeRecords(sub, 2)
+ if err != nil {
+ t.Fatalf("Failed to subscribe after seek: %v", err)
+ }
+
+ if len(responses) != 2 {
+ t.Errorf("Expected 2 responses after seek, got %d", len(responses))
+ }
+
+ if responses[0].Offset != 3 {
+ t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset)
+ }
+}
+
+func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription at offset 1
+ sub, err := integration.CreateSubscription(
+ "lag-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_EXACT_OFFSET,
+ 1,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Get lag
+ lag, err := integration.GetSubscriptionLag("lag-sub")
+ if err != nil {
+ t.Fatalf("Failed to get subscription lag: %v", err)
+ }
+
+ expectedLag := int64(3 - 1) // hwm - current
+ if lag != expectedLag {
+ t.Errorf("Expected lag %d, got %d", expectedLag, lag)
+ }
+
+ // Advance subscription and check lag again
+ integration.SubscribeRecords(sub, 1)
+
+ lag, err = integration.GetSubscriptionLag("lag-sub")
+ if err != nil {
+ t.Fatalf("Failed to get lag after advance: %v", err)
+ }
+
+ expectedLag = int64(3 - 2) // hwm - current
+ if lag != expectedLag {
+ t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
+ }
+}
+
+func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Create subscription
+ _, err := integration.CreateSubscription(
+ "close-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Close subscription
+ err = integration.CloseSubscription("close-sub")
+ if err != nil {
+ t.Fatalf("Failed to close subscription: %v", err)
+ }
+
+ // Try to get lag (should fail)
+ _, err = integration.GetSubscriptionLag("close-sub")
+ if err == nil {
+ t.Error("Expected error when getting lag for closed subscription")
+ }
+}
+
+func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish some records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Test valid range
+ err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 2)
+ if err != nil {
+ t.Errorf("Valid range should not return error: %v", err)
+ }
+
+ // Test invalid range (beyond hwm)
+ err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 5)
+ if err == nil {
+ t.Error("Expected error for range beyond high water mark")
+ }
+}
+
+func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Test empty partition
+ offsetRange, err := integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get available range for empty partition: %v", err)
+ }
+
+ if offsetRange.Count != 0 {
+ t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
+ }
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Test with data
+ offsetRange, err = integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get available range: %v", err)
+ }
+
+ if offsetRange.StartOffset != 0 {
+ t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
+ }
+
+ if offsetRange.EndOffset != 1 {
+ t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset)
+ }
+
+ if offsetRange.Count != 2 {
+ t.Errorf("Expected count 2, got %d", offsetRange.Count)
+ }
+}
+
+func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Initial metrics
+ metrics := integration.GetOffsetMetrics()
+ if metrics.TotalOffsets != 0 {
+ t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
+ }
+
+ if metrics.ActiveSubscriptions != 0 {
+ t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions)
+ }
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscriptions
+ integration.CreateSubscription("sub1", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ integration.CreateSubscription("sub2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+
+ // Check updated metrics
+ metrics = integration.GetOffsetMetrics()
+ if metrics.TotalOffsets != 2 {
+ t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets)
+ }
+
+ if metrics.ActiveSubscriptions != 2 {
+ t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions)
+ }
+
+ if metrics.PartitionCount != 1 {
+ t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
+ }
+}
+
+func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Test non-existent offset
+ info, err := integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0)
+ if err != nil {
+ t.Fatalf("Failed to get offset info: %v", err)
+ }
+
+ if info.Exists {
+ t.Error("Offset should not exist in empty partition")
+ }
+
+ // Publish record
+ integration.PublishRecord("test-namespace", "test-topic", partition, []byte("key1"), &schema_pb.RecordValue{})
+
+ // Test existing offset
+ info, err = integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0)
+ if err != nil {
+ t.Fatalf("Failed to get offset info for existing offset: %v", err)
+ }
+
+ if !info.Exists {
+ t.Error("Offset should exist after publishing")
+ }
+
+ if info.Offset != 0 {
+ t.Errorf("Expected offset 0, got %d", info.Offset)
+ }
+}
+
+func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Test empty partition
+ info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get partition offset info: %v", err)
+ }
+
+ if info.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
+ }
+
+ if info.LatestOffset != -1 {
+ t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
+ }
+
+ if info.HighWaterMark != 0 {
+ t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark)
+ }
+
+ if info.RecordCount != 0 {
+ t.Errorf("Expected record count 0, got %d", info.RecordCount)
+ }
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ integration.CreateSubscription("test-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+
+ // Test with data
+ info, err = integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get partition offset info with data: %v", err)
+ }
+
+ if info.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
+ }
+
+ if info.LatestOffset != 2 {
+ t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
+ }
+
+ if info.HighWaterMark != 3 {
+ t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
+ }
+
+ if info.RecordCount != 3 {
+ t.Errorf("Expected record count 3, got %d", info.RecordCount)
+ }
+
+ if info.ActiveSubscriptions != 1 {
+ t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions)
+ }
+}