diff options
Diffstat (limited to 'weed/mq/offset')
| -rw-r--r-- | weed/mq/offset/benchmark_test.go | 454 | ||||
| -rw-r--r-- | weed/mq/offset/consumer_group_storage.go | 181 | ||||
| -rw-r--r-- | weed/mq/offset/consumer_group_storage_test.go | 128 | ||||
| -rw-r--r-- | weed/mq/offset/end_to_end_test.go | 472 | ||||
| -rw-r--r-- | weed/mq/offset/filer_storage.go | 100 | ||||
| -rw-r--r-- | weed/mq/offset/integration.go | 380 | ||||
| -rw-r--r-- | weed/mq/offset/integration_test.go | 544 | ||||
| -rw-r--r-- | weed/mq/offset/manager.go | 343 | ||||
| -rw-r--r-- | weed/mq/offset/manager_test.go | 388 | ||||
| -rw-r--r-- | weed/mq/offset/memory_storage_test.go | 228 | ||||
| -rw-r--r-- | weed/mq/offset/migration.go | 302 | ||||
| -rw-r--r-- | weed/mq/offset/sql_storage.go | 394 | ||||
| -rw-r--r-- | weed/mq/offset/sql_storage_test.go | 516 | ||||
| -rw-r--r-- | weed/mq/offset/storage.go | 5 | ||||
| -rw-r--r-- | weed/mq/offset/subscriber.go | 355 | ||||
| -rw-r--r-- | weed/mq/offset/subscriber_test.go | 457 |
16 files changed, 5247 insertions, 0 deletions
diff --git a/weed/mq/offset/benchmark_test.go b/weed/mq/offset/benchmark_test.go new file mode 100644 index 000000000..d6f33206f --- /dev/null +++ b/weed/mq/offset/benchmark_test.go @@ -0,0 +1,454 @@ +package offset + +import ( + "fmt" + "os" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// BenchmarkOffsetAssignment benchmarks sequential offset assignment +func BenchmarkOffsetAssignment(b *testing.B) { + storage := NewInMemoryOffsetStorage() + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + b.Fatalf("Failed to create partition manager: %v", err) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + manager.AssignOffset() + } + }) +} + +// BenchmarkBatchOffsetAssignment benchmarks batch offset assignment +func BenchmarkBatchOffsetAssignment(b *testing.B) { + storage := NewInMemoryOffsetStorage() + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + b.Fatalf("Failed to create partition manager: %v", err) + } + + batchSizes := []int64{1, 10, 100, 1000} + + for _, batchSize := range batchSizes { + b.Run(fmt.Sprintf("BatchSize%d", batchSize), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + manager.AssignOffsets(batchSize) + } + }) + } +} + +// BenchmarkSQLOffsetStorage benchmarks SQL storage operations +func BenchmarkSQLOffsetStorage(b *testing.B) { + // Create temporary database + tmpFile, err := os.CreateTemp("", "benchmark_*.db") + if err != nil { + b.Fatalf("Failed to create temp database: %v", err) + } + tmpFile.Close() + defer os.Remove(tmpFile.Name()) + + db, err := CreateDatabase(tmpFile.Name()) + if err != nil { + b.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + storage, err := NewSQLOffsetStorage(db) + if err != nil { + b.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + partitionKey := partitionKey(partition) + + b.Run("SaveCheckpoint", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.SaveCheckpoint("test-namespace", "test-topic", partition, int64(i)) + } + }) + + b.Run("LoadCheckpoint", func(b *testing.B) { + storage.SaveCheckpoint("test-namespace", "test-topic", partition, 1000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.LoadCheckpoint("test-namespace", "test-topic", partition) + } + }) + + b.Run("SaveOffsetMapping", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), 100) + } + }) + + // Pre-populate for read benchmarks + for i := 0; i < 1000; i++ { + storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), 100) + } + + b.Run("GetHighestOffset", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.GetHighestOffset("test-namespace", "test-topic", partition) + } + }) + + b.Run("LoadOffsetMappings", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.LoadOffsetMappings(partitionKey) + } + }) + + b.Run("GetOffsetMappingsByRange", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + start := int64(i % 900) + end := start + 100 + storage.GetOffsetMappingsByRange(partitionKey, start, end) + } + }) + + b.Run("GetPartitionStats", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.GetPartitionStats(partitionKey) + } + }) +} + +// BenchmarkInMemoryVsSQL compares in-memory and SQL storage performance +func BenchmarkInMemoryVsSQL(b *testing.B) { + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + // In-memory storage benchmark + b.Run("InMemory", func(b *testing.B) { + storage := NewInMemoryOffsetStorage() + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + b.Fatalf("Failed to create partition manager: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + manager.AssignOffset() + } + }) + + // SQL storage benchmark + b.Run("SQL", func(b *testing.B) { + tmpFile, err := os.CreateTemp("", "benchmark_sql_*.db") + if err != nil { + b.Fatalf("Failed to create temp database: %v", err) + } + tmpFile.Close() + defer os.Remove(tmpFile.Name()) + + db, err := CreateDatabase(tmpFile.Name()) + if err != nil { + b.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + storage, err := NewSQLOffsetStorage(db) + if err != nil { + b.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + b.Fatalf("Failed to create partition manager: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + manager.AssignOffset() + } + }) +} + +// BenchmarkOffsetSubscription benchmarks subscription operations +func BenchmarkOffsetSubscription(b *testing.B) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + // Pre-assign offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 10000) + + b.Run("CreateSubscription", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + subscriptionID := fmt.Sprintf("bench-sub-%d", i) + sub, err := subscriber.CreateSubscription( + subscriptionID, + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + b.Fatalf("Failed to create subscription: %v", err) + } + subscriber.CloseSubscription(subscriptionID) + _ = sub + } + }) + + // Create subscription for other benchmarks + sub, err := subscriber.CreateSubscription( + "bench-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + b.Fatalf("Failed to create subscription: %v", err) + } + + b.Run("GetOffsetRange", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + sub.GetOffsetRange(100) + } + }) + + b.Run("AdvanceOffset", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + sub.AdvanceOffset() + } + }) + + b.Run("GetLag", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + sub.GetLag() + } + }) + + b.Run("SeekToOffset", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + offset := int64(i % 9000) // Stay within bounds + sub.SeekToOffset(offset) + } + }) +} + +// BenchmarkSMQOffsetIntegration benchmarks the full integration layer +func BenchmarkSMQOffsetIntegration(b *testing.B) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + b.Run("PublishRecord", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := fmt.Sprintf("key-%d", i) + integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{}) + } + }) + + b.Run("PublishRecordBatch", func(b *testing.B) { + batchSizes := []int{1, 10, 100} + + for _, batchSize := range batchSizes { + b.Run(fmt.Sprintf("BatchSize%d", batchSize), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + records := make([]PublishRecordRequest, batchSize) + for j := 0; j < batchSize; j++ { + records[j] = PublishRecordRequest{ + Key: []byte(fmt.Sprintf("batch-%d-key-%d", i, j)), + Value: &schema_pb.RecordValue{}, + } + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + } + }) + } + }) + + // Pre-populate for subscription benchmarks + records := make([]PublishRecordRequest, 1000) + for i := 0; i < 1000; i++ { + records[i] = PublishRecordRequest{ + Key: []byte(fmt.Sprintf("pre-key-%d", i)), + Value: &schema_pb.RecordValue{}, + } + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + b.Run("CreateSubscription", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + subscriptionID := fmt.Sprintf("integration-sub-%d", i) + sub, err := integration.CreateSubscription( + subscriptionID, + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + b.Fatalf("Failed to create subscription: %v", err) + } + integration.CloseSubscription(subscriptionID) + _ = sub + } + }) + + b.Run("GetHighWaterMark", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + integration.GetHighWaterMark("test-namespace", "test-topic", partition) + } + }) + + b.Run("GetPartitionOffsetInfo", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition) + } + }) +} + +// BenchmarkConcurrentOperations benchmarks concurrent offset operations +func BenchmarkConcurrentOperations(b *testing.B) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + b.Run("ConcurrentPublish", func(b *testing.B) { + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("concurrent-key-%d", i) + integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{}) + i++ + } + }) + }) + + // Pre-populate for concurrent reads + for i := 0; i < 1000; i++ { + key := fmt.Sprintf("read-key-%d", i) + integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{}) + } + + b.Run("ConcurrentRead", func(b *testing.B) { + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + integration.GetHighWaterMark("test-namespace", "test-topic", partition) + } + }) + }) + + b.Run("ConcurrentMixed", func(b *testing.B) { + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + if i%10 == 0 { + // 10% writes + key := fmt.Sprintf("mixed-key-%d", i) + integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{}) + } else { + // 90% reads + integration.GetHighWaterMark("test-namespace", "test-topic", partition) + } + i++ + } + }) + }) +} + +// BenchmarkMemoryUsage benchmarks memory usage patterns +func BenchmarkMemoryUsage(b *testing.B) { + b.Run("InMemoryStorage", func(b *testing.B) { + storage := NewInMemoryOffsetStorage() + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + b.Fatalf("Failed to create partition manager: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + manager.AssignOffset() + if i%1000 == 0 { + // Periodic checkpoint to simulate real usage + manager.checkpoint(int64(i)) + } + } + }) +} diff --git a/weed/mq/offset/consumer_group_storage.go b/weed/mq/offset/consumer_group_storage.go new file mode 100644 index 000000000..74c2db908 --- /dev/null +++ b/weed/mq/offset/consumer_group_storage.go @@ -0,0 +1,181 @@ +package offset + +import ( + "context" + "encoding/json" + "fmt" + "io" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// ConsumerGroupPosition represents a consumer's position in a partition +// This can be either a timestamp or an offset +type ConsumerGroupPosition struct { + Type string `json:"type"` // "offset" or "timestamp" + Value int64 `json:"value"` // The actual offset or timestamp value + OffsetType string `json:"offset_type"` // Optional: OffsetType enum name (e.g., "EXACT_OFFSET") + CommittedAt int64 `json:"committed_at"` // Unix timestamp in milliseconds when committed + Metadata string `json:"metadata"` // Optional: application-specific metadata +} + +// ConsumerGroupOffsetStorage handles consumer group offset persistence +// Each consumer group gets its own offset file in a dedicated consumers/ subfolder: +// Path: /topics/{namespace}/{topic}/{version}/{partition}/consumers/{consumer_group}.offset +type ConsumerGroupOffsetStorage interface { + // SaveConsumerGroupOffset saves the committed offset for a consumer group + SaveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error + + // SaveConsumerGroupPosition saves the committed position (offset or timestamp) for a consumer group + SaveConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string, position *ConsumerGroupPosition) error + + // LoadConsumerGroupOffset loads the committed offset for a consumer group (backward compatible) + LoadConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) (int64, error) + + // LoadConsumerGroupPosition loads the committed position for a consumer group + LoadConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string) (*ConsumerGroupPosition, error) + + // ListConsumerGroups returns all consumer groups for a topic partition + ListConsumerGroups(t topic.Topic, p topic.Partition) ([]string, error) + + // DeleteConsumerGroupOffset removes the offset file for a consumer group + DeleteConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) error +} + +// FilerConsumerGroupOffsetStorage implements ConsumerGroupOffsetStorage using SeaweedFS filer +type FilerConsumerGroupOffsetStorage struct { + filerClientAccessor *filer_client.FilerClientAccessor +} + +// NewFilerConsumerGroupOffsetStorageWithAccessor creates storage using a shared filer client accessor +func NewFilerConsumerGroupOffsetStorageWithAccessor(filerClientAccessor *filer_client.FilerClientAccessor) *FilerConsumerGroupOffsetStorage { + return &FilerConsumerGroupOffsetStorage{ + filerClientAccessor: filerClientAccessor, + } +} + +// SaveConsumerGroupOffset saves the committed offset for a consumer group +// Stores as: /topics/{namespace}/{topic}/{version}/{partition}/consumers/{consumer_group}.offset +// This is a convenience method that wraps SaveConsumerGroupPosition +func (f *FilerConsumerGroupOffsetStorage) SaveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error { + position := &ConsumerGroupPosition{ + Type: "offset", + Value: offset, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET.String(), + CommittedAt: time.Now().UnixMilli(), + } + return f.SaveConsumerGroupPosition(t, p, consumerGroup, position) +} + +// SaveConsumerGroupPosition saves the committed position (offset or timestamp) for a consumer group +// Stores as JSON: /topics/{namespace}/{topic}/{version}/{partition}/consumers/{consumer_group}.offset +func (f *FilerConsumerGroupOffsetStorage) SaveConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string, position *ConsumerGroupPosition) error { + partitionDir := topic.PartitionDir(t, p) + consumersDir := fmt.Sprintf("%s/consumers", partitionDir) + offsetFileName := fmt.Sprintf("%s.offset", consumerGroup) + + // Marshal position to JSON + jsonBytes, err := json.Marshal(position) + if err != nil { + return fmt.Errorf("failed to marshal position to JSON: %w", err) + } + + return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, consumersDir, offsetFileName, jsonBytes) + }) +} + +// LoadConsumerGroupOffset loads the committed offset for a consumer group +// This method provides backward compatibility and returns just the offset value +func (f *FilerConsumerGroupOffsetStorage) LoadConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) (int64, error) { + position, err := f.LoadConsumerGroupPosition(t, p, consumerGroup) + if err != nil { + return -1, err + } + return position.Value, nil +} + +// LoadConsumerGroupPosition loads the committed position for a consumer group +func (f *FilerConsumerGroupOffsetStorage) LoadConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string) (*ConsumerGroupPosition, error) { + partitionDir := topic.PartitionDir(t, p) + consumersDir := fmt.Sprintf("%s/consumers", partitionDir) + offsetFileName := fmt.Sprintf("%s.offset", consumerGroup) + + var position *ConsumerGroupPosition + err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := filer.ReadInsideFiler(client, consumersDir, offsetFileName) + if err != nil { + return err + } + + // Parse JSON format + position = &ConsumerGroupPosition{} + if err := json.Unmarshal(data, position); err != nil { + return fmt.Errorf("invalid consumer group offset file format: %w", err) + } + + return nil + }) + + if err != nil { + return nil, err + } + + return position, nil +} + +// ListConsumerGroups returns all consumer groups for a topic partition +func (f *FilerConsumerGroupOffsetStorage) ListConsumerGroups(t topic.Topic, p topic.Partition) ([]string, error) { + partitionDir := topic.PartitionDir(t, p) + consumersDir := fmt.Sprintf("%s/consumers", partitionDir) + var consumerGroups []string + + err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Use ListEntries to get directory contents + stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: consumersDir, + }) + if err != nil { + return err + } + + for { + resp, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + return err + } + + entry := resp.Entry + if entry != nil && !entry.IsDirectory && entry.Name != "" { + // Check if this is a consumer group offset file (ends with .offset) + if len(entry.Name) > 7 && entry.Name[len(entry.Name)-7:] == ".offset" { + // Extract consumer group name (remove .offset suffix) + consumerGroup := entry.Name[:len(entry.Name)-7] + consumerGroups = append(consumerGroups, consumerGroup) + } + } + } + return nil + }) + + return consumerGroups, err +} + +// DeleteConsumerGroupOffset removes the offset file for a consumer group +func (f *FilerConsumerGroupOffsetStorage) DeleteConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) error { + partitionDir := topic.PartitionDir(t, p) + consumersDir := fmt.Sprintf("%s/consumers", partitionDir) + offsetFileName := fmt.Sprintf("%s.offset", consumerGroup) + + return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.DoRemove(context.Background(), client, consumersDir, offsetFileName, false, false, false, false, nil) + }) +} diff --git a/weed/mq/offset/consumer_group_storage_test.go b/weed/mq/offset/consumer_group_storage_test.go new file mode 100644 index 000000000..ff1163e93 --- /dev/null +++ b/weed/mq/offset/consumer_group_storage_test.go @@ -0,0 +1,128 @@ +package offset + +import ( + "encoding/json" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestConsumerGroupPosition_JSON(t *testing.T) { + tests := []struct { + name string + position *ConsumerGroupPosition + }{ + { + name: "offset-based position", + position: &ConsumerGroupPosition{ + Type: "offset", + Value: 12345, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET.String(), + CommittedAt: time.Now().UnixMilli(), + Metadata: "test metadata", + }, + }, + { + name: "timestamp-based position", + position: &ConsumerGroupPosition{ + Type: "timestamp", + Value: time.Now().UnixNano(), + OffsetType: schema_pb.OffsetType_EXACT_TS_NS.String(), + CommittedAt: time.Now().UnixMilli(), + Metadata: "checkpoint at 2024-10-05", + }, + }, + { + name: "minimal position", + position: &ConsumerGroupPosition{ + Type: "offset", + Value: 42, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Marshal to JSON + jsonBytes, err := json.Marshal(tt.position) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + t.Logf("JSON: %s", string(jsonBytes)) + + // Unmarshal from JSON + var decoded ConsumerGroupPosition + if err := json.Unmarshal(jsonBytes, &decoded); err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + // Verify fields + if decoded.Type != tt.position.Type { + t.Errorf("Type mismatch: got %s, want %s", decoded.Type, tt.position.Type) + } + if decoded.Value != tt.position.Value { + t.Errorf("Value mismatch: got %d, want %d", decoded.Value, tt.position.Value) + } + if decoded.OffsetType != tt.position.OffsetType { + t.Errorf("OffsetType mismatch: got %s, want %s", decoded.OffsetType, tt.position.OffsetType) + } + if decoded.Metadata != tt.position.Metadata { + t.Errorf("Metadata mismatch: got %s, want %s", decoded.Metadata, tt.position.Metadata) + } + }) + } +} + +func TestConsumerGroupPosition_JSONExamples(t *testing.T) { + // Test JSON format examples + jsonExamples := []string{ + `{"type":"offset","value":12345}`, + `{"type":"timestamp","value":1696521600000000000}`, + `{"type":"offset","value":42,"offset_type":"EXACT_OFFSET","committed_at":1696521600000,"metadata":"test"}`, + } + + for i, jsonStr := range jsonExamples { + var position ConsumerGroupPosition + if err := json.Unmarshal([]byte(jsonStr), &position); err != nil { + t.Errorf("Example %d: Failed to parse JSON: %v", i, err) + continue + } + + t.Logf("Example %d: Type=%s, Value=%d", i, position.Type, position.Value) + + // Verify required fields + if position.Type == "" { + t.Errorf("Example %d: Type is empty", i) + } + if position.Value == 0 { + t.Errorf("Example %d: Value is zero", i) + } + } +} + +func TestConsumerGroupPosition_TypeValidation(t *testing.T) { + validTypes := []string{"offset", "timestamp"} + + for _, typ := range validTypes { + position := &ConsumerGroupPosition{ + Type: typ, + Value: 100, + } + + jsonBytes, err := json.Marshal(position) + if err != nil { + t.Fatalf("Failed to marshal position with type '%s': %v", typ, err) + } + + var decoded ConsumerGroupPosition + if err := json.Unmarshal(jsonBytes, &decoded); err != nil { + t.Fatalf("Failed to unmarshal position with type '%s': %v", typ, err) + } + + if decoded.Type != typ { + t.Errorf("Type mismatch: got '%s', want '%s'", decoded.Type, typ) + } + } +} diff --git a/weed/mq/offset/end_to_end_test.go b/weed/mq/offset/end_to_end_test.go new file mode 100644 index 000000000..a4db891e1 --- /dev/null +++ b/weed/mq/offset/end_to_end_test.go @@ -0,0 +1,472 @@ +package offset + +import ( + "fmt" + "os" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// TestEndToEndOffsetFlow tests the complete offset management flow +func TestEndToEndOffsetFlow(t *testing.T) { + // Create temporary database + tmpFile, err := os.CreateTemp("", "e2e_offset_test_*.db") + if err != nil { + t.Fatalf("Failed to create temp database: %v", err) + } + tmpFile.Close() + defer os.Remove(tmpFile.Name()) + + // Create database with migrations + db, err := CreateDatabase(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Create SQL storage + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + // Create SMQ offset integration + integration := NewSMQOffsetIntegration(storage) + + // Test partition + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + t.Run("PublishAndAssignOffsets", func(t *testing.T) { + // Simulate publishing messages with offset assignment + records := []PublishRecordRequest{ + {Key: []byte("user1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("user2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("user3"), Value: &schema_pb.RecordValue{}}, + } + + response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + if err != nil { + t.Fatalf("Failed to publish record batch: %v", err) + } + + if response.BaseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", response.BaseOffset) + } + + if response.LastOffset != 2 { + t.Errorf("Expected last offset 2, got %d", response.LastOffset) + } + + // Verify high water mark + hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get high water mark: %v", err) + } + + if hwm != 3 { + t.Errorf("Expected high water mark 3, got %d", hwm) + } + }) + + t.Run("CreateAndUseSubscription", func(t *testing.T) { + // Create subscription from earliest + sub, err := integration.CreateSubscription( + "e2e-test-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Subscribe to records + responses, err := integration.SubscribeRecords(sub, 2) + if err != nil { + t.Fatalf("Failed to subscribe to records: %v", err) + } + + if len(responses) != 2 { + t.Errorf("Expected 2 responses, got %d", len(responses)) + } + + // Check subscription advancement + if sub.CurrentOffset != 2 { + t.Errorf("Expected current offset 2, got %d", sub.CurrentOffset) + } + + // Get subscription lag + lag, err := sub.GetLag() + if err != nil { + t.Fatalf("Failed to get lag: %v", err) + } + + if lag != 1 { // 3 (hwm) - 2 (current) = 1 + t.Errorf("Expected lag 1, got %d", lag) + } + }) + + t.Run("OffsetSeekingAndRanges", func(t *testing.T) { + // Create subscription at specific offset + sub, err := integration.CreateSubscription( + "seek-test-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_EXACT_OFFSET, + 1, + ) + if err != nil { + t.Fatalf("Failed to create subscription at offset 1: %v", err) + } + + // Verify starting position + if sub.CurrentOffset != 1 { + t.Errorf("Expected current offset 1, got %d", sub.CurrentOffset) + } + + // Get offset range + offsetRange, err := sub.GetOffsetRange(2) + if err != nil { + t.Fatalf("Failed to get offset range: %v", err) + } + + if offsetRange.StartOffset != 1 { + t.Errorf("Expected start offset 1, got %d", offsetRange.StartOffset) + } + + if offsetRange.Count != 2 { + t.Errorf("Expected count 2, got %d", offsetRange.Count) + } + + // Seek to different offset + err = sub.SeekToOffset(0) + if err != nil { + t.Fatalf("Failed to seek to offset 0: %v", err) + } + + if sub.CurrentOffset != 0 { + t.Errorf("Expected current offset 0 after seek, got %d", sub.CurrentOffset) + } + }) + + t.Run("PartitionInformationAndMetrics", func(t *testing.T) { + // Get partition offset info + info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get partition offset info: %v", err) + } + + if info.EarliestOffset != 0 { + t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) + } + + if info.LatestOffset != 2 { + t.Errorf("Expected latest offset 2, got %d", info.LatestOffset) + } + + if info.HighWaterMark != 3 { + t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark) + } + + if info.ActiveSubscriptions != 2 { // Two subscriptions created above + t.Errorf("Expected 2 active subscriptions, got %d", info.ActiveSubscriptions) + } + + // Get offset metrics + metrics := integration.GetOffsetMetrics() + if metrics.PartitionCount != 1 { + t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount) + } + + if metrics.ActiveSubscriptions != 2 { + t.Errorf("Expected 2 active subscriptions in metrics, got %d", metrics.ActiveSubscriptions) + } + }) +} + +// TestOffsetPersistenceAcrossRestarts tests that offsets persist across system restarts +func TestOffsetPersistenceAcrossRestarts(t *testing.T) { + // Create temporary database + tmpFile, err := os.CreateTemp("", "persistence_test_*.db") + if err != nil { + t.Fatalf("Failed to create temp database: %v", err) + } + tmpFile.Close() + defer os.Remove(tmpFile.Name()) + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + var lastOffset int64 + + // First session: Create database and assign offsets + { + db, err := CreateDatabase(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + + integration := NewSMQOffsetIntegration(storage) + + // Publish some records + records := []PublishRecordRequest{ + {Key: []byte("msg1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("msg2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("msg3"), Value: &schema_pb.RecordValue{}}, + } + + response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + if err != nil { + t.Fatalf("Failed to publish records: %v", err) + } + + lastOffset = response.LastOffset + + // Close connections + storage.Close() + db.Close() + } + + // Second session: Reopen database and verify persistence + { + db, err := CreateDatabase(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to reopen database: %v", err) + } + defer db.Close() + + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + integration := NewSMQOffsetIntegration(storage) + + // Verify high water mark persisted + hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get high water mark after restart: %v", err) + } + + if hwm != lastOffset+1 { + t.Errorf("Expected high water mark %d after restart, got %d", lastOffset+1, hwm) + } + + // Assign new offsets and verify continuity + newResponse, err := integration.PublishRecord("test-namespace", "test-topic", partition, []byte("msg4"), &schema_pb.RecordValue{}) + if err != nil { + t.Fatalf("Failed to publish new record after restart: %v", err) + } + + expectedNextOffset := lastOffset + 1 + if newResponse.BaseOffset != expectedNextOffset { + t.Errorf("Expected next offset %d after restart, got %d", expectedNextOffset, newResponse.BaseOffset) + } + } +} + +// TestConcurrentOffsetOperations tests concurrent offset operations +func TestConcurrentOffsetOperations(t *testing.T) { + // Create temporary database + tmpFile, err := os.CreateTemp("", "concurrent_test_*.db") + if err != nil { + t.Fatalf("Failed to create temp database: %v", err) + } + tmpFile.Close() + defer os.Remove(tmpFile.Name()) + + db, err := CreateDatabase(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + integration := NewSMQOffsetIntegration(storage) + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + // Concurrent publishers + const numPublishers = 5 + const recordsPerPublisher = 10 + + done := make(chan bool, numPublishers) + + for i := 0; i < numPublishers; i++ { + go func(publisherID int) { + defer func() { done <- true }() + + for j := 0; j < recordsPerPublisher; j++ { + key := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j) + _, err := integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{}) + if err != nil { + t.Errorf("Publisher %d failed to publish message %d: %v", publisherID, j, err) + return + } + } + }(i) + } + + // Wait for all publishers to complete + for i := 0; i < numPublishers; i++ { + <-done + } + + // Verify total records + hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get high water mark: %v", err) + } + + expectedTotal := int64(numPublishers * recordsPerPublisher) + if hwm != expectedTotal { + t.Errorf("Expected high water mark %d, got %d", expectedTotal, hwm) + } + + // Verify no duplicate offsets + info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get partition info: %v", err) + } + + if info.RecordCount != expectedTotal { + t.Errorf("Expected record count %d, got %d", expectedTotal, info.RecordCount) + } +} + +// TestOffsetValidationAndErrorHandling tests error conditions and validation +func TestOffsetValidationAndErrorHandling(t *testing.T) { + // Create temporary database + tmpFile, err := os.CreateTemp("", "validation_test_*.db") + if err != nil { + t.Fatalf("Failed to create temp database: %v", err) + } + tmpFile.Close() + defer os.Remove(tmpFile.Name()) + + db, err := CreateDatabase(tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + integration := NewSMQOffsetIntegration(storage) + + partition := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + t.Run("InvalidOffsetSubscription", func(t *testing.T) { + // Try to create subscription with invalid offset + _, err := integration.CreateSubscription( + "invalid-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_EXACT_OFFSET, + 100, // Beyond any existing data + ) + if err == nil { + t.Error("Expected error for subscription beyond high water mark") + } + }) + + t.Run("NegativeOffsetValidation", func(t *testing.T) { + // Try to create subscription with negative offset + _, err := integration.CreateSubscription( + "negative-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_EXACT_OFFSET, + -1, + ) + if err == nil { + t.Error("Expected error for negative offset") + } + }) + + t.Run("DuplicateSubscriptionID", func(t *testing.T) { + // Create first subscription + _, err := integration.CreateSubscription( + "duplicate-id", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create first subscription: %v", err) + } + + // Try to create duplicate + _, err = integration.CreateSubscription( + "duplicate-id", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err == nil { + t.Error("Expected error for duplicate subscription ID") + } + }) + + t.Run("OffsetRangeValidation", func(t *testing.T) { + // Add some data first + integration.PublishRecord("test-namespace", "test-topic", partition, []byte("test"), &schema_pb.RecordValue{}) + + // Test invalid range validation + err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 5, 10) // Beyond high water mark + if err == nil { + t.Error("Expected error for range beyond high water mark") + } + + err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 10, 5) // End before start + if err == nil { + t.Error("Expected error for end offset before start offset") + } + + err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, -1, 5) // Negative start + if err == nil { + t.Error("Expected error for negative start offset") + } + }) +} diff --git a/weed/mq/offset/filer_storage.go b/weed/mq/offset/filer_storage.go new file mode 100644 index 000000000..81be78470 --- /dev/null +++ b/weed/mq/offset/filer_storage.go @@ -0,0 +1,100 @@ +package offset + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// FilerOffsetStorage implements OffsetStorage using SeaweedFS filer +// Stores offset data as files in the same directory structure as SMQ +// Path: /topics/{namespace}/{topic}/{version}/{partition}/checkpoint.offset +// The namespace and topic are derived from the actual partition information +type FilerOffsetStorage struct { + filerClientAccessor *filer_client.FilerClientAccessor +} + +// NewFilerOffsetStorageWithAccessor creates a new filer-based offset storage using existing filer client accessor +func NewFilerOffsetStorageWithAccessor(filerClientAccessor *filer_client.FilerClientAccessor) *FilerOffsetStorage { + return &FilerOffsetStorage{ + filerClientAccessor: filerClientAccessor, + } +} + +// SaveCheckpoint saves the checkpoint for a partition +// Stores as: /topics/{namespace}/{topic}/{version}/{partition}/checkpoint.offset +func (f *FilerOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error { + partitionDir := f.getPartitionDir(namespace, topicName, partition) + fileName := "checkpoint.offset" + + // Use SMQ's 8-byte offset format + offsetBytes := make([]byte, 8) + util.Uint64toBytes(offsetBytes, uint64(offset)) + + return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, partitionDir, fileName, offsetBytes) + }) +} + +// LoadCheckpoint loads the checkpoint for a partition +func (f *FilerOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + partitionDir := f.getPartitionDir(namespace, topicName, partition) + fileName := "checkpoint.offset" + + var offset int64 = -1 + err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := filer.ReadInsideFiler(client, partitionDir, fileName) + if err != nil { + return err + } + if len(data) != 8 { + return fmt.Errorf("invalid checkpoint file format: expected 8 bytes, got %d", len(data)) + } + offset = int64(util.BytesToUint64(data)) + return nil + }) + + if err != nil { + return -1, err + } + + return offset, nil +} + +// GetHighestOffset returns the highest offset stored for a partition +// For filer storage, this is the same as the checkpoint since we don't store individual records +func (f *FilerOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + return f.LoadCheckpoint(namespace, topicName, partition) +} + +// Reset clears all data for testing +func (f *FilerOffsetStorage) Reset() error { + // For testing, we could delete all offset files, but this is dangerous + // Instead, just return success - individual tests should clean up their own data + return nil +} + +// Helper methods + +// getPartitionDir returns the directory path for a partition following SMQ convention +// Format: /topics/{namespace}/{topic}/{version}/{partition} +func (f *FilerOffsetStorage) getPartitionDir(namespace, topicName string, partition *schema_pb.Partition) string { + // Generate version from UnixTimeNs + version := time.Unix(0, partition.UnixTimeNs).UTC().Format("v2006-01-02-15-04-05") + + // Generate partition range string + partitionRange := fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop) + + return fmt.Sprintf("%s/%s/%s/%s/%s", filer.TopicsDir, namespace, topicName, version, partitionRange) +} + +// getPartitionKey generates a unique key for a partition +func (f *FilerOffsetStorage) getPartitionKey(partition *schema_pb.Partition) string { + return fmt.Sprintf("ring:%d:range:%d-%d:time:%d", + partition.RingSize, partition.RangeStart, partition.RangeStop, partition.UnixTimeNs) +} diff --git a/weed/mq/offset/integration.go b/weed/mq/offset/integration.go new file mode 100644 index 000000000..4b9ee6183 --- /dev/null +++ b/weed/mq/offset/integration.go @@ -0,0 +1,380 @@ +package offset + +import ( + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// SMQOffsetIntegration provides integration between offset management and SMQ broker +type SMQOffsetIntegration struct { + mu sync.RWMutex + offsetAssigner *OffsetAssigner + offsetSubscriber *OffsetSubscriber + offsetSeeker *OffsetSeeker +} + +// NewSMQOffsetIntegration creates a new SMQ offset integration +func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration { + registry := NewPartitionOffsetRegistry(storage) + assigner := &OffsetAssigner{registry: registry} + + return &SMQOffsetIntegration{ + offsetAssigner: assigner, + offsetSubscriber: NewOffsetSubscriber(registry), + offsetSeeker: NewOffsetSeeker(registry), + } +} + +// PublishRecord publishes a record and assigns it an offset +func (integration *SMQOffsetIntegration) PublishRecord( + namespace, topicName string, + partition *schema_pb.Partition, + key []byte, + value *schema_pb.RecordValue, +) (*mq_agent_pb.PublishRecordResponse, error) { + + // Assign offset for this record + result := integration.offsetAssigner.AssignSingleOffset(namespace, topicName, partition) + if result.Error != nil { + return &mq_agent_pb.PublishRecordResponse{ + Error: fmt.Sprintf("Failed to assign offset: %v", result.Error), + }, nil + } + + assignment := result.Assignment + + // Note: Removed in-memory mapping storage to prevent memory leaks + // Record-to-offset mappings are now handled by persistent storage layer + + // Return response with offset information + return &mq_agent_pb.PublishRecordResponse{ + AckSequence: assignment.Offset, // Use offset as ack sequence for now + BaseOffset: assignment.Offset, + LastOffset: assignment.Offset, + Error: "", + }, nil +} + +// PublishRecordBatch publishes a batch of records and assigns them offsets +func (integration *SMQOffsetIntegration) PublishRecordBatch( + namespace, topicName string, + partition *schema_pb.Partition, + records []PublishRecordRequest, +) (*mq_agent_pb.PublishRecordResponse, error) { + + if len(records) == 0 { + return &mq_agent_pb.PublishRecordResponse{ + Error: "Empty record batch", + }, nil + } + + // Assign batch of offsets + result := integration.offsetAssigner.AssignBatchOffsets(namespace, topicName, partition, int64(len(records))) + if result.Error != nil { + return &mq_agent_pb.PublishRecordResponse{ + Error: fmt.Sprintf("Failed to assign batch offsets: %v", result.Error), + }, nil + } + + batch := result.Batch + + // Note: Removed in-memory mapping storage to prevent memory leaks + // Batch record-to-offset mappings are now handled by persistent storage layer + + return &mq_agent_pb.PublishRecordResponse{ + AckSequence: batch.LastOffset, // Use last offset as ack sequence + BaseOffset: batch.BaseOffset, + LastOffset: batch.LastOffset, + Error: "", + }, nil +} + +// CreateSubscription creates an offset-based subscription +func (integration *SMQOffsetIntegration) CreateSubscription( + subscriptionID string, + namespace, topicName string, + partition *schema_pb.Partition, + offsetType schema_pb.OffsetType, + startOffset int64, +) (*OffsetSubscription, error) { + + return integration.offsetSubscriber.CreateSubscription( + subscriptionID, + namespace, topicName, + partition, + offsetType, + startOffset, + ) +} + +// SubscribeRecords subscribes to records starting from a specific offset +func (integration *SMQOffsetIntegration) SubscribeRecords( + subscription *OffsetSubscription, + maxRecords int64, +) ([]*mq_agent_pb.SubscribeRecordResponse, error) { + + if !subscription.IsActive { + return nil, fmt.Errorf("subscription is not active") + } + + // Get the range of offsets to read + offsetRange, err := subscription.GetOffsetRange(maxRecords) + if err != nil { + return nil, fmt.Errorf("failed to get offset range: %w", err) + } + + if offsetRange.Count == 0 { + // No records available + return []*mq_agent_pb.SubscribeRecordResponse{}, nil + } + + // TODO: This is where we would integrate with SMQ's actual storage layer + // For now, return mock responses with offset information + responses := make([]*mq_agent_pb.SubscribeRecordResponse, offsetRange.Count) + + for i := int64(0); i < offsetRange.Count; i++ { + offset := offsetRange.StartOffset + i + + responses[i] = &mq_agent_pb.SubscribeRecordResponse{ + Key: []byte(fmt.Sprintf("key-%d", offset)), + Value: &schema_pb.RecordValue{}, // Mock value + TsNs: offset * 1000000, // Mock timestamp based on offset + Offset: offset, + IsEndOfStream: false, + IsEndOfTopic: false, + Error: "", + } + } + + // Advance the subscription + subscription.AdvanceOffsetBy(offsetRange.Count) + + return responses, nil +} + +// GetHighWaterMark returns the high water mark for a partition +func (integration *SMQOffsetIntegration) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + return integration.offsetAssigner.GetHighWaterMark(namespace, topicName, partition) +} + +// SeekSubscription seeks a subscription to a specific offset +func (integration *SMQOffsetIntegration) SeekSubscription( + subscriptionID string, + offset int64, +) error { + + subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID) + if err != nil { + return fmt.Errorf("subscription not found: %w", err) + } + + return subscription.SeekToOffset(offset) +} + +// GetSubscriptionLag returns the lag for a subscription +func (integration *SMQOffsetIntegration) GetSubscriptionLag(subscriptionID string) (int64, error) { + subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID) + if err != nil { + return 0, fmt.Errorf("subscription not found: %w", err) + } + + return subscription.GetLag() +} + +// CloseSubscription closes a subscription +func (integration *SMQOffsetIntegration) CloseSubscription(subscriptionID string) error { + return integration.offsetSubscriber.CloseSubscription(subscriptionID) +} + +// ValidateOffsetRange validates an offset range for a partition +func (integration *SMQOffsetIntegration) ValidateOffsetRange( + namespace, topicName string, + partition *schema_pb.Partition, + startOffset, endOffset int64, +) error { + + return integration.offsetSeeker.ValidateOffsetRange(namespace, topicName, partition, startOffset, endOffset) +} + +// GetAvailableOffsetRange returns the available offset range for a partition +func (integration *SMQOffsetIntegration) GetAvailableOffsetRange(namespace, topicName string, partition *schema_pb.Partition) (*OffsetRange, error) { + return integration.offsetSeeker.GetAvailableOffsetRange(namespace, topicName, partition) +} + +// PublishRecordRequest represents a record to be published +type PublishRecordRequest struct { + Key []byte + Value *schema_pb.RecordValue +} + +// OffsetMetrics provides metrics about offset usage +type OffsetMetrics struct { + PartitionCount int64 + TotalOffsets int64 + ActiveSubscriptions int64 + AverageLatency float64 +} + +// GetOffsetMetrics returns metrics about offset usage +func (integration *SMQOffsetIntegration) GetOffsetMetrics() *OffsetMetrics { + integration.mu.RLock() + defer integration.mu.RUnlock() + + // Count active subscriptions + activeSubscriptions := int64(0) + for _, subscription := range integration.offsetSubscriber.subscriptions { + if subscription.IsActive { + activeSubscriptions++ + } + } + + // Calculate total offsets from all partition managers instead of in-memory map + var totalOffsets int64 + for _, manager := range integration.offsetAssigner.registry.managers { + totalOffsets += manager.GetHighWaterMark() + } + + return &OffsetMetrics{ + PartitionCount: int64(len(integration.offsetAssigner.registry.managers)), + TotalOffsets: totalOffsets, // Now calculated from storage, not memory maps + ActiveSubscriptions: activeSubscriptions, + AverageLatency: 0.0, // TODO: Implement latency tracking + } +} + +// OffsetInfo provides detailed information about an offset +type OffsetInfo struct { + Offset int64 + Timestamp int64 + Partition *schema_pb.Partition + Exists bool +} + +// GetOffsetInfo returns detailed information about a specific offset +func (integration *SMQOffsetIntegration) GetOffsetInfo( + namespace, topicName string, + partition *schema_pb.Partition, + offset int64, +) (*OffsetInfo, error) { + + hwm, err := integration.GetHighWaterMark(namespace, topicName, partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + exists := offset >= 0 && offset < hwm + + // TODO: Get actual timestamp from storage + timestamp := int64(0) + // Note: Timestamp lookup from in-memory map removed to prevent memory leaks + // For now, use a placeholder timestamp. In production, this should come from + // persistent storage if timestamp tracking is needed. + if exists { + timestamp = time.Now().UnixNano() // Placeholder - should come from storage + } + + return &OffsetInfo{ + Offset: offset, + Timestamp: timestamp, + Partition: partition, + Exists: exists, + }, nil +} + +// PartitionOffsetInfo provides offset information for a partition +type PartitionOffsetInfo struct { + Partition *schema_pb.Partition + EarliestOffset int64 + LatestOffset int64 + HighWaterMark int64 + RecordCount int64 + ActiveSubscriptions int64 +} + +// GetPartitionOffsetInfo returns comprehensive offset information for a partition +func (integration *SMQOffsetIntegration) GetPartitionOffsetInfo(namespace, topicName string, partition *schema_pb.Partition) (*PartitionOffsetInfo, error) { + hwm, err := integration.GetHighWaterMark(namespace, topicName, partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + earliestOffset := int64(0) + latestOffset := hwm - 1 + if hwm == 0 { + latestOffset = -1 // No records + } + + // Count active subscriptions for this partition + activeSubscriptions := int64(0) + integration.mu.RLock() + for _, subscription := range integration.offsetSubscriber.subscriptions { + if subscription.IsActive && partitionKey(subscription.Partition) == partitionKey(partition) { + activeSubscriptions++ + } + } + integration.mu.RUnlock() + + return &PartitionOffsetInfo{ + Partition: partition, + EarliestOffset: earliestOffset, + LatestOffset: latestOffset, + HighWaterMark: hwm, + RecordCount: hwm, + ActiveSubscriptions: activeSubscriptions, + }, nil +} + +// GetSubscription retrieves an existing subscription +func (integration *SMQOffsetIntegration) GetSubscription(subscriptionID string) (*OffsetSubscription, error) { + return integration.offsetSubscriber.GetSubscription(subscriptionID) +} + +// ListActiveSubscriptions returns all active subscriptions +func (integration *SMQOffsetIntegration) ListActiveSubscriptions() ([]*OffsetSubscription, error) { + integration.mu.RLock() + defer integration.mu.RUnlock() + + result := make([]*OffsetSubscription, 0) + for _, subscription := range integration.offsetSubscriber.subscriptions { + if subscription.IsActive { + result = append(result, subscription) + } + } + + return result, nil +} + +// AssignSingleOffset assigns a single offset for a partition +func (integration *SMQOffsetIntegration) AssignSingleOffset(namespace, topicName string, partition *schema_pb.Partition) *AssignmentResult { + return integration.offsetAssigner.AssignSingleOffset(namespace, topicName, partition) +} + +// AssignBatchOffsets assigns a batch of offsets for a partition +func (integration *SMQOffsetIntegration) AssignBatchOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) *AssignmentResult { + return integration.offsetAssigner.AssignBatchOffsets(namespace, topicName, partition, count) +} + +// Reset resets the integration layer state (for testing) +func (integration *SMQOffsetIntegration) Reset() { + integration.mu.Lock() + defer integration.mu.Unlock() + + // Note: No in-memory maps to clear (removed to prevent memory leaks) + + // Close all subscriptions + for _, subscription := range integration.offsetSubscriber.subscriptions { + subscription.IsActive = false + } + integration.offsetSubscriber.subscriptions = make(map[string]*OffsetSubscription) + + // Reset the registries by creating new ones with the same storage + // This ensures that partition managers start fresh + registry := NewPartitionOffsetRegistry(integration.offsetAssigner.registry.storage) + integration.offsetAssigner.registry = registry + integration.offsetSubscriber.offsetRegistry = registry + integration.offsetSeeker.offsetRegistry = registry +} diff --git a/weed/mq/offset/integration_test.go b/weed/mq/offset/integration_test.go new file mode 100644 index 000000000..35299be65 --- /dev/null +++ b/weed/mq/offset/integration_test.go @@ -0,0 +1,544 @@ +package offset + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestSMQOffsetIntegration_PublishRecord(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish a single record + response, err := integration.PublishRecord( + "test-namespace", "test-topic", + partition, + []byte("test-key"), + &schema_pb.RecordValue{}, + ) + + if err != nil { + t.Fatalf("Failed to publish record: %v", err) + } + + if response.Error != "" { + t.Errorf("Expected no error, got: %s", response.Error) + } + + if response.BaseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", response.BaseOffset) + } + + if response.LastOffset != 0 { + t.Errorf("Expected last offset 0, got %d", response.LastOffset) + } +} + +func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Create batch of records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + + // Publish batch + response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + if err != nil { + t.Fatalf("Failed to publish record batch: %v", err) + } + + if response.Error != "" { + t.Errorf("Expected no error, got: %s", response.Error) + } + + if response.BaseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", response.BaseOffset) + } + + if response.LastOffset != 2 { + t.Errorf("Expected last offset 2, got %d", response.LastOffset) + } + + // Verify high water mark + hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get high water mark: %v", err) + } + + if hwm != 3 { + t.Errorf("Expected high water mark 3, got %d", hwm) + } +} + +func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish empty batch + response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, []PublishRecordRequest{}) + if err != nil { + t.Fatalf("Failed to publish empty batch: %v", err) + } + + if response.Error == "" { + t.Error("Expected error for empty batch") + } +} + +func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish some records first + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Create subscription + sub, err := integration.CreateSubscription( + "test-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + if sub.ID != "test-sub" { + t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID) + } + + if sub.StartOffset != 0 { + t.Errorf("Expected start offset 0, got %d", sub.StartOffset) + } +} + +func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish some records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Create subscription + sub, err := integration.CreateSubscription( + "test-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Subscribe to records + responses, err := integration.SubscribeRecords(sub, 2) + if err != nil { + t.Fatalf("Failed to subscribe to records: %v", err) + } + + if len(responses) != 2 { + t.Errorf("Expected 2 responses, got %d", len(responses)) + } + + // Check offset progression + if responses[0].Offset != 0 { + t.Errorf("Expected first record offset 0, got %d", responses[0].Offset) + } + + if responses[1].Offset != 1 { + t.Errorf("Expected second record offset 1, got %d", responses[1].Offset) + } + + // Check subscription advancement + if sub.CurrentOffset != 2 { + t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset) + } +} + +func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Create subscription on empty partition + sub, err := integration.CreateSubscription( + "empty-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Subscribe to records (should return empty) + responses, err := integration.SubscribeRecords(sub, 10) + if err != nil { + t.Fatalf("Failed to subscribe to empty partition: %v", err) + } + + if len(responses) != 0 { + t.Errorf("Expected 0 responses from empty partition, got %d", len(responses)) + } +} + +func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key4"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key5"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Create subscription + sub, err := integration.CreateSubscription( + "seek-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Seek to offset 3 + err = integration.SeekSubscription("seek-sub", 3) + if err != nil { + t.Fatalf("Failed to seek subscription: %v", err) + } + + if sub.CurrentOffset != 3 { + t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset) + } + + // Subscribe from new position + responses, err := integration.SubscribeRecords(sub, 2) + if err != nil { + t.Fatalf("Failed to subscribe after seek: %v", err) + } + + if len(responses) != 2 { + t.Errorf("Expected 2 responses after seek, got %d", len(responses)) + } + + if responses[0].Offset != 3 { + t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset) + } +} + +func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Create subscription at offset 1 + sub, err := integration.CreateSubscription( + "lag-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_EXACT_OFFSET, + 1, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Get lag + lag, err := integration.GetSubscriptionLag("lag-sub") + if err != nil { + t.Fatalf("Failed to get subscription lag: %v", err) + } + + expectedLag := int64(3 - 1) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d, got %d", expectedLag, lag) + } + + // Advance subscription and check lag again + integration.SubscribeRecords(sub, 1) + + lag, err = integration.GetSubscriptionLag("lag-sub") + if err != nil { + t.Fatalf("Failed to get lag after advance: %v", err) + } + + expectedLag = int64(3 - 2) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag) + } +} + +func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Create subscription + _, err := integration.CreateSubscription( + "close-sub", + "test-namespace", "test-topic", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Close subscription + err = integration.CloseSubscription("close-sub") + if err != nil { + t.Fatalf("Failed to close subscription: %v", err) + } + + // Try to get lag (should fail) + _, err = integration.GetSubscriptionLag("close-sub") + if err == nil { + t.Error("Expected error when getting lag for closed subscription") + } +} + +func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish some records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Test valid range + err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 2) + if err != nil { + t.Errorf("Valid range should not return error: %v", err) + } + + // Test invalid range (beyond hwm) + err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 5) + if err == nil { + t.Error("Expected error for range beyond high water mark") + } +} + +func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Test empty partition + offsetRange, err := integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get available range for empty partition: %v", err) + } + + if offsetRange.Count != 0 { + t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count) + } + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Test with data + offsetRange, err = integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get available range: %v", err) + } + + if offsetRange.StartOffset != 0 { + t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset) + } + + if offsetRange.EndOffset != 1 { + t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset) + } + + if offsetRange.Count != 2 { + t.Errorf("Expected count 2, got %d", offsetRange.Count) + } +} + +func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Initial metrics + metrics := integration.GetOffsetMetrics() + if metrics.TotalOffsets != 0 { + t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets) + } + + if metrics.ActiveSubscriptions != 0 { + t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions) + } + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Create subscriptions + integration.CreateSubscription("sub1", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + integration.CreateSubscription("sub2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + + // Check updated metrics + metrics = integration.GetOffsetMetrics() + if metrics.TotalOffsets != 2 { + t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets) + } + + if metrics.ActiveSubscriptions != 2 { + t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions) + } + + if metrics.PartitionCount != 1 { + t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount) + } +} + +func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Test non-existent offset + info, err := integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0) + if err != nil { + t.Fatalf("Failed to get offset info: %v", err) + } + + if info.Exists { + t.Error("Offset should not exist in empty partition") + } + + // Publish record + integration.PublishRecord("test-namespace", "test-topic", partition, []byte("key1"), &schema_pb.RecordValue{}) + + // Test existing offset + info, err = integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0) + if err != nil { + t.Fatalf("Failed to get offset info for existing offset: %v", err) + } + + if !info.Exists { + t.Error("Offset should exist after publishing") + } + + if info.Offset != 0 { + t.Errorf("Expected offset 0, got %d", info.Offset) + } +} + +func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Test empty partition + info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get partition offset info: %v", err) + } + + if info.EarliestOffset != 0 { + t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) + } + + if info.LatestOffset != -1 { + t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset) + } + + if info.HighWaterMark != 0 { + t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark) + } + + if info.RecordCount != 0 { + t.Errorf("Expected record count 0, got %d", info.RecordCount) + } + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) + + // Create subscription + integration.CreateSubscription("test-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + + // Test with data + info, err = integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get partition offset info with data: %v", err) + } + + if info.EarliestOffset != 0 { + t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) + } + + if info.LatestOffset != 2 { + t.Errorf("Expected latest offset 2, got %d", info.LatestOffset) + } + + if info.HighWaterMark != 3 { + t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark) + } + + if info.RecordCount != 3 { + t.Errorf("Expected record count 3, got %d", info.RecordCount) + } + + if info.ActiveSubscriptions != 1 { + t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions) + } +} diff --git a/weed/mq/offset/manager.go b/weed/mq/offset/manager.go new file mode 100644 index 000000000..01976a8bf --- /dev/null +++ b/weed/mq/offset/manager.go @@ -0,0 +1,343 @@ +package offset + +import ( + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// PartitionOffsetManager manages sequential offset assignment for a single partition +type PartitionOffsetManager struct { + mu sync.RWMutex + namespace string + topicName string + partition *schema_pb.Partition + nextOffset int64 + + // Checkpointing for recovery + lastCheckpoint int64 + checkpointInterval int64 + storage OffsetStorage +} + +// OffsetStorage interface for persisting offset state +type OffsetStorage interface { + // SaveCheckpoint persists the current offset state for recovery + // Takes topic information along with partition to determine the correct storage location + SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error + + // LoadCheckpoint retrieves the last saved offset state + LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) + + // GetHighestOffset scans storage to find the highest assigned offset + GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) +} + +// NewPartitionOffsetManager creates a new offset manager for a partition +func NewPartitionOffsetManager(namespace, topicName string, partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) { + manager := &PartitionOffsetManager{ + namespace: namespace, + topicName: topicName, + partition: partition, + checkpointInterval: 1, // Checkpoint every offset for immediate persistence + storage: storage, + } + + // Recover offset state + if err := manager.recover(); err != nil { + return nil, fmt.Errorf("failed to recover offset state: %w", err) + } + + return manager, nil +} + +// AssignOffset assigns the next sequential offset +func (m *PartitionOffsetManager) AssignOffset() int64 { + var shouldCheckpoint bool + var checkpointOffset int64 + + m.mu.Lock() + offset := m.nextOffset + m.nextOffset++ + + // Check if we should checkpoint (but don't do it inside the lock) + if offset-m.lastCheckpoint >= m.checkpointInterval { + shouldCheckpoint = true + checkpointOffset = offset + } + m.mu.Unlock() + + // Checkpoint outside the lock to avoid deadlock + if shouldCheckpoint { + m.checkpoint(checkpointOffset) + } + + return offset +} + +// AssignOffsets assigns a batch of sequential offsets +func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) { + var shouldCheckpoint bool + var checkpointOffset int64 + + m.mu.Lock() + baseOffset = m.nextOffset + lastOffset = m.nextOffset + count - 1 + m.nextOffset += count + + // Check if we should checkpoint (but don't do it inside the lock) + if lastOffset-m.lastCheckpoint >= m.checkpointInterval { + shouldCheckpoint = true + checkpointOffset = lastOffset + } + m.mu.Unlock() + + // Checkpoint outside the lock to avoid deadlock + if shouldCheckpoint { + m.checkpoint(checkpointOffset) + } + + return baseOffset, lastOffset +} + +// GetNextOffset returns the next offset that will be assigned +func (m *PartitionOffsetManager) GetNextOffset() int64 { + m.mu.RLock() + defer m.mu.RUnlock() + return m.nextOffset +} + +// GetHighWaterMark returns the high water mark (next offset) +func (m *PartitionOffsetManager) GetHighWaterMark() int64 { + return m.GetNextOffset() +} + +// recover restores offset state from storage +func (m *PartitionOffsetManager) recover() error { + var checkpointOffset int64 = -1 + var highestOffset int64 = -1 + + // Try to load checkpoint + if offset, err := m.storage.LoadCheckpoint(m.namespace, m.topicName, m.partition); err == nil && offset >= 0 { + checkpointOffset = offset + } + + // Try to scan storage for highest offset + if offset, err := m.storage.GetHighestOffset(m.namespace, m.topicName, m.partition); err == nil && offset >= 0 { + highestOffset = offset + } + + // Use the higher of checkpoint or storage scan + if checkpointOffset >= 0 && highestOffset >= 0 { + if highestOffset > checkpointOffset { + m.nextOffset = highestOffset + 1 + m.lastCheckpoint = highestOffset + } else { + m.nextOffset = checkpointOffset + 1 + m.lastCheckpoint = checkpointOffset + } + } else if checkpointOffset >= 0 { + m.nextOffset = checkpointOffset + 1 + m.lastCheckpoint = checkpointOffset + } else if highestOffset >= 0 { + m.nextOffset = highestOffset + 1 + m.lastCheckpoint = highestOffset + } else { + // No data exists, start from 0 + m.nextOffset = 0 + m.lastCheckpoint = -1 + } + + return nil +} + +// checkpoint saves the current offset state +func (m *PartitionOffsetManager) checkpoint(offset int64) { + if err := m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, offset); err != nil { + // Log error but don't fail - checkpointing is for optimization + fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err) + return + } + + m.mu.Lock() + m.lastCheckpoint = offset + m.mu.Unlock() +} + +// PartitionOffsetRegistry manages offset managers for multiple partitions +type PartitionOffsetRegistry struct { + mu sync.RWMutex + managers map[string]*PartitionOffsetManager + storage OffsetStorage +} + +// NewPartitionOffsetRegistry creates a new registry +func NewPartitionOffsetRegistry(storage OffsetStorage) *PartitionOffsetRegistry { + return &PartitionOffsetRegistry{ + managers: make(map[string]*PartitionOffsetManager), + storage: storage, + } +} + +// GetManager returns the offset manager for a partition, creating it if needed +func (r *PartitionOffsetRegistry) GetManager(namespace, topicName string, partition *schema_pb.Partition) (*PartitionOffsetManager, error) { + // CRITICAL FIX: Use TopicPartitionKey to ensure each topic has its own offset manager + key := TopicPartitionKey(namespace, topicName, partition) + + r.mu.RLock() + manager, exists := r.managers[key] + r.mu.RUnlock() + + if exists { + return manager, nil + } + + // Create new manager + r.mu.Lock() + defer r.mu.Unlock() + + // Double-check after acquiring write lock + if manager, exists := r.managers[key]; exists { + return manager, nil + } + + manager, err := NewPartitionOffsetManager(namespace, topicName, partition, r.storage) + if err != nil { + return nil, err + } + + r.managers[key] = manager + return manager, nil +} + +// AssignOffset assigns an offset for the given partition +func (r *PartitionOffsetRegistry) AssignOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + manager, err := r.GetManager(namespace, topicName, partition) + if err != nil { + return 0, err + } + + assignedOffset := manager.AssignOffset() + + return assignedOffset, nil +} + +// AssignOffsets assigns a batch of offsets for the given partition +func (r *PartitionOffsetRegistry) AssignOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) (baseOffset, lastOffset int64, err error) { + manager, err := r.GetManager(namespace, topicName, partition) + if err != nil { + return 0, 0, err + } + + baseOffset, lastOffset = manager.AssignOffsets(count) + return baseOffset, lastOffset, nil +} + +// GetHighWaterMark returns the high water mark for a partition +func (r *PartitionOffsetRegistry) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + manager, err := r.GetManager(namespace, topicName, partition) + if err != nil { + return 0, err + } + + return manager.GetHighWaterMark(), nil +} + +// TopicPartitionKey generates a unique key for a topic-partition combination +// This is the canonical key format used across the offset management system +func TopicPartitionKey(namespace, topicName string, partition *schema_pb.Partition) string { + return fmt.Sprintf("%s/%s/ring:%d:range:%d-%d", + namespace, topicName, + partition.RingSize, partition.RangeStart, partition.RangeStop) +} + +// PartitionKey generates a unique key for a partition (without topic context) +// Note: UnixTimeNs is intentionally excluded from the key because it represents +// partition creation time, not partition identity. Using it would cause offset +// tracking to reset whenever a partition is recreated or looked up again. +// DEPRECATED: Use TopicPartitionKey for production code to avoid key collisions +func PartitionKey(partition *schema_pb.Partition) string { + return fmt.Sprintf("ring:%d:range:%d-%d", + partition.RingSize, partition.RangeStart, partition.RangeStop) +} + +// partitionKey is the internal lowercase version for backward compatibility within this package +func partitionKey(partition *schema_pb.Partition) string { + return PartitionKey(partition) +} + +// OffsetAssignment represents an assigned offset with metadata +type OffsetAssignment struct { + Offset int64 + Timestamp int64 + Partition *schema_pb.Partition +} + +// BatchOffsetAssignment represents a batch of assigned offsets +type BatchOffsetAssignment struct { + BaseOffset int64 + LastOffset int64 + Count int64 + Timestamp int64 + Partition *schema_pb.Partition +} + +// AssignmentResult contains the result of offset assignment +type AssignmentResult struct { + Assignment *OffsetAssignment + Batch *BatchOffsetAssignment + Error error +} + +// OffsetAssigner provides high-level offset assignment operations +type OffsetAssigner struct { + registry *PartitionOffsetRegistry +} + +// NewOffsetAssigner creates a new offset assigner +func NewOffsetAssigner(storage OffsetStorage) *OffsetAssigner { + return &OffsetAssigner{ + registry: NewPartitionOffsetRegistry(storage), + } +} + +// AssignSingleOffset assigns a single offset with timestamp +func (a *OffsetAssigner) AssignSingleOffset(namespace, topicName string, partition *schema_pb.Partition) *AssignmentResult { + offset, err := a.registry.AssignOffset(namespace, topicName, partition) + if err != nil { + return &AssignmentResult{Error: err} + } + + return &AssignmentResult{ + Assignment: &OffsetAssignment{ + Offset: offset, + Timestamp: time.Now().UnixNano(), + Partition: partition, + }, + } +} + +// AssignBatchOffsets assigns a batch of offsets with timestamp +func (a *OffsetAssigner) AssignBatchOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) *AssignmentResult { + baseOffset, lastOffset, err := a.registry.AssignOffsets(namespace, topicName, partition, count) + if err != nil { + return &AssignmentResult{Error: err} + } + + return &AssignmentResult{ + Batch: &BatchOffsetAssignment{ + BaseOffset: baseOffset, + LastOffset: lastOffset, + Count: count, + Timestamp: time.Now().UnixNano(), + Partition: partition, + }, + } +} + +// GetHighWaterMark returns the high water mark for a partition +func (a *OffsetAssigner) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + return a.registry.GetHighWaterMark(namespace, topicName, partition) +} diff --git a/weed/mq/offset/manager_test.go b/weed/mq/offset/manager_test.go new file mode 100644 index 000000000..0db301e84 --- /dev/null +++ b/weed/mq/offset/manager_test.go @@ -0,0 +1,388 @@ +package offset + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func createTestPartition() *schema_pb.Partition { + return &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } +} + +func TestPartitionOffsetManager_BasicAssignment(t *testing.T) { + storage := NewInMemoryOffsetStorage() + partition := createTestPartition() + + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + t.Fatalf("Failed to create offset manager: %v", err) + } + + // Test sequential offset assignment + for i := int64(0); i < 10; i++ { + offset := manager.AssignOffset() + if offset != i { + t.Errorf("Expected offset %d, got %d", i, offset) + } + } + + // Test high water mark + hwm := manager.GetHighWaterMark() + if hwm != 10 { + t.Errorf("Expected high water mark 10, got %d", hwm) + } +} + +func TestPartitionOffsetManager_BatchAssignment(t *testing.T) { + storage := NewInMemoryOffsetStorage() + partition := createTestPartition() + + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + t.Fatalf("Failed to create offset manager: %v", err) + } + + // Assign batch of 5 offsets + baseOffset, lastOffset := manager.AssignOffsets(5) + if baseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", baseOffset) + } + if lastOffset != 4 { + t.Errorf("Expected last offset 4, got %d", lastOffset) + } + + // Assign another batch + baseOffset, lastOffset = manager.AssignOffsets(3) + if baseOffset != 5 { + t.Errorf("Expected base offset 5, got %d", baseOffset) + } + if lastOffset != 7 { + t.Errorf("Expected last offset 7, got %d", lastOffset) + } + + // Check high water mark + hwm := manager.GetHighWaterMark() + if hwm != 8 { + t.Errorf("Expected high water mark 8, got %d", hwm) + } +} + +func TestPartitionOffsetManager_Recovery(t *testing.T) { + storage := NewInMemoryOffsetStorage() + partition := createTestPartition() + + // Create manager and assign some offsets + manager1, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + t.Fatalf("Failed to create offset manager: %v", err) + } + + // Assign offsets and simulate records + for i := 0; i < 150; i++ { // More than checkpoint interval + offset := manager1.AssignOffset() + storage.AddRecord("test-namespace", "test-topic", partition, offset) + } + + // Wait for checkpoint to complete + time.Sleep(100 * time.Millisecond) + + // Create new manager (simulates restart) + manager2, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + t.Fatalf("Failed to create offset manager after recovery: %v", err) + } + + // Next offset should continue from checkpoint + 1 + // With checkpoint interval 100, checkpoint happens at offset 100 + // So recovery should start from 101, but we assigned 150 offsets (0-149) + // The checkpoint should be at 100, so next offset should be 101 + // But since we have records up to 149, it should recover from storage scan + nextOffset := manager2.AssignOffset() + if nextOffset != 150 { + t.Errorf("Expected next offset 150 after recovery, got %d", nextOffset) + } +} + +func TestPartitionOffsetManager_RecoveryFromStorage(t *testing.T) { + storage := NewInMemoryOffsetStorage() + partition := createTestPartition() + + // Simulate existing records in storage without checkpoint + for i := int64(0); i < 50; i++ { + storage.AddRecord("test-namespace", "test-topic", partition, i) + } + + // Create manager - should recover from storage scan + manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage) + if err != nil { + t.Fatalf("Failed to create offset manager: %v", err) + } + + // Next offset should be 50 + nextOffset := manager.AssignOffset() + if nextOffset != 50 { + t.Errorf("Expected next offset 50 after storage recovery, got %d", nextOffset) + } +} + +func TestPartitionOffsetRegistry_MultiplePartitions(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + + // Create different partitions + partition1 := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } + + partition2 := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 32, + RangeStop: 63, + UnixTimeNs: time.Now().UnixNano(), + } + + // Assign offsets to different partitions + offset1, err := registry.AssignOffset("test-namespace", "test-topic", partition1) + if err != nil { + t.Fatalf("Failed to assign offset to partition1: %v", err) + } + if offset1 != 0 { + t.Errorf("Expected offset 0 for partition1, got %d", offset1) + } + + offset2, err := registry.AssignOffset("test-namespace", "test-topic", partition2) + if err != nil { + t.Fatalf("Failed to assign offset to partition2: %v", err) + } + if offset2 != 0 { + t.Errorf("Expected offset 0 for partition2, got %d", offset2) + } + + // Assign more offsets to partition1 + offset1_2, err := registry.AssignOffset("test-namespace", "test-topic", partition1) + if err != nil { + t.Fatalf("Failed to assign second offset to partition1: %v", err) + } + if offset1_2 != 1 { + t.Errorf("Expected offset 1 for partition1, got %d", offset1_2) + } + + // Partition2 should still be at 0 for next assignment + offset2_2, err := registry.AssignOffset("test-namespace", "test-topic", partition2) + if err != nil { + t.Fatalf("Failed to assign second offset to partition2: %v", err) + } + if offset2_2 != 1 { + t.Errorf("Expected offset 1 for partition2, got %d", offset2_2) + } +} + +func TestPartitionOffsetRegistry_BatchAssignment(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + partition := createTestPartition() + + // Assign batch of offsets + baseOffset, lastOffset, err := registry.AssignOffsets("test-namespace", "test-topic", partition, 10) + if err != nil { + t.Fatalf("Failed to assign batch offsets: %v", err) + } + + if baseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", baseOffset) + } + if lastOffset != 9 { + t.Errorf("Expected last offset 9, got %d", lastOffset) + } + + // Get high water mark + hwm, err := registry.GetHighWaterMark("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get high water mark: %v", err) + } + if hwm != 10 { + t.Errorf("Expected high water mark 10, got %d", hwm) + } +} + +func TestOffsetAssigner_SingleAssignment(t *testing.T) { + storage := NewInMemoryOffsetStorage() + assigner := NewOffsetAssigner(storage) + partition := createTestPartition() + + // Assign single offset + result := assigner.AssignSingleOffset("test-namespace", "test-topic", partition) + if result.Error != nil { + t.Fatalf("Failed to assign single offset: %v", result.Error) + } + + if result.Assignment == nil { + t.Fatal("Assignment result is nil") + } + + if result.Assignment.Offset != 0 { + t.Errorf("Expected offset 0, got %d", result.Assignment.Offset) + } + + if result.Assignment.Partition != partition { + t.Error("Partition mismatch in assignment") + } + + if result.Assignment.Timestamp <= 0 { + t.Error("Timestamp should be set") + } +} + +func TestOffsetAssigner_BatchAssignment(t *testing.T) { + storage := NewInMemoryOffsetStorage() + assigner := NewOffsetAssigner(storage) + partition := createTestPartition() + + // Assign batch of offsets + result := assigner.AssignBatchOffsets("test-namespace", "test-topic", partition, 5) + if result.Error != nil { + t.Fatalf("Failed to assign batch offsets: %v", result.Error) + } + + if result.Batch == nil { + t.Fatal("Batch result is nil") + } + + if result.Batch.BaseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", result.Batch.BaseOffset) + } + + if result.Batch.LastOffset != 4 { + t.Errorf("Expected last offset 4, got %d", result.Batch.LastOffset) + } + + if result.Batch.Count != 5 { + t.Errorf("Expected count 5, got %d", result.Batch.Count) + } + + if result.Batch.Timestamp <= 0 { + t.Error("Timestamp should be set") + } +} + +func TestOffsetAssigner_HighWaterMark(t *testing.T) { + storage := NewInMemoryOffsetStorage() + assigner := NewOffsetAssigner(storage) + partition := createTestPartition() + + // Initially should be 0 + hwm, err := assigner.GetHighWaterMark("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get initial high water mark: %v", err) + } + if hwm != 0 { + t.Errorf("Expected initial high water mark 0, got %d", hwm) + } + + // Assign some offsets + assigner.AssignBatchOffsets("test-namespace", "test-topic", partition, 10) + + // High water mark should be updated + hwm, err = assigner.GetHighWaterMark("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get high water mark after assignment: %v", err) + } + if hwm != 10 { + t.Errorf("Expected high water mark 10, got %d", hwm) + } +} + +func TestPartitionKey(t *testing.T) { + partition1 := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: 1234567890, + } + + partition2 := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: 1234567890, + } + + partition3 := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 32, + RangeStop: 63, + UnixTimeNs: 1234567890, + } + + key1 := partitionKey(partition1) + key2 := partitionKey(partition2) + key3 := partitionKey(partition3) + + // Same partitions should have same key + if key1 != key2 { + t.Errorf("Same partitions should have same key: %s vs %s", key1, key2) + } + + // Different partitions should have different keys + if key1 == key3 { + t.Errorf("Different partitions should have different keys: %s vs %s", key1, key3) + } +} + +func TestConcurrentOffsetAssignment(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + partition := createTestPartition() + + const numGoroutines = 10 + const offsetsPerGoroutine = 100 + + results := make(chan int64, numGoroutines*offsetsPerGoroutine) + + // Start concurrent offset assignments + for i := 0; i < numGoroutines; i++ { + go func() { + for j := 0; j < offsetsPerGoroutine; j++ { + offset, err := registry.AssignOffset("test-namespace", "test-topic", partition) + if err != nil { + t.Errorf("Failed to assign offset: %v", err) + return + } + results <- offset + } + }() + } + + // Collect all results + offsets := make(map[int64]bool) + for i := 0; i < numGoroutines*offsetsPerGoroutine; i++ { + offset := <-results + if offsets[offset] { + t.Errorf("Duplicate offset assigned: %d", offset) + } + offsets[offset] = true + } + + // Verify we got all expected offsets + expectedCount := numGoroutines * offsetsPerGoroutine + if len(offsets) != expectedCount { + t.Errorf("Expected %d unique offsets, got %d", expectedCount, len(offsets)) + } + + // Verify offsets are in expected range + for offset := range offsets { + if offset < 0 || offset >= int64(expectedCount) { + t.Errorf("Offset %d is out of expected range [0, %d)", offset, expectedCount) + } + } +} diff --git a/weed/mq/offset/memory_storage_test.go b/weed/mq/offset/memory_storage_test.go new file mode 100644 index 000000000..4434e1eb6 --- /dev/null +++ b/weed/mq/offset/memory_storage_test.go @@ -0,0 +1,228 @@ +package offset + +import ( + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// recordEntry holds a record with timestamp for TTL cleanup +type recordEntry struct { + exists bool + timestamp time.Time +} + +// InMemoryOffsetStorage provides an in-memory implementation of OffsetStorage for testing ONLY +// WARNING: This should NEVER be used in production - use FilerOffsetStorage or SQLOffsetStorage instead +type InMemoryOffsetStorage struct { + mu sync.RWMutex + checkpoints map[string]int64 // partition key -> offset + records map[string]map[int64]*recordEntry // partition key -> offset -> entry with timestamp + + // Memory leak protection + maxRecordsPerPartition int // Maximum records to keep per partition + recordTTL time.Duration // TTL for record entries + lastCleanup time.Time // Last cleanup time + cleanupInterval time.Duration // How often to run cleanup +} + +// NewInMemoryOffsetStorage creates a new in-memory storage with memory leak protection +// FOR TESTING ONLY - do not use in production +func NewInMemoryOffsetStorage() *InMemoryOffsetStorage { + return &InMemoryOffsetStorage{ + checkpoints: make(map[string]int64), + records: make(map[string]map[int64]*recordEntry), + maxRecordsPerPartition: 10000, // Limit to 10K records per partition + recordTTL: 1 * time.Hour, // Records expire after 1 hour + cleanupInterval: 5 * time.Minute, // Cleanup every 5 minutes + lastCleanup: time.Now(), + } +} + +// SaveCheckpoint saves the checkpoint for a partition +func (s *InMemoryOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Use TopicPartitionKey for consistency with other storage implementations + key := TopicPartitionKey(namespace, topicName, partition) + s.checkpoints[key] = offset + return nil +} + +// LoadCheckpoint loads the checkpoint for a partition +func (s *InMemoryOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Use TopicPartitionKey to match SaveCheckpoint + key := TopicPartitionKey(namespace, topicName, partition) + offset, exists := s.checkpoints[key] + if !exists { + return -1, fmt.Errorf("no checkpoint found") + } + + return offset, nil +} + +// GetHighestOffset finds the highest offset in storage for a partition +func (s *InMemoryOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Use TopicPartitionKey to match SaveCheckpoint + key := TopicPartitionKey(namespace, topicName, partition) + offsets, exists := s.records[key] + if !exists || len(offsets) == 0 { + return -1, fmt.Errorf("no records found") + } + + var highest int64 = -1 + for offset, entry := range offsets { + if entry.exists && offset > highest { + highest = offset + } + } + + return highest, nil +} + +// AddRecord simulates storing a record with an offset (for testing) +func (s *InMemoryOffsetStorage) AddRecord(namespace, topicName string, partition *schema_pb.Partition, offset int64) { + s.mu.Lock() + defer s.mu.Unlock() + + // Use TopicPartitionKey to match GetHighestOffset + key := TopicPartitionKey(namespace, topicName, partition) + if s.records[key] == nil { + s.records[key] = make(map[int64]*recordEntry) + } + + // Add record with current timestamp + s.records[key][offset] = &recordEntry{ + exists: true, + timestamp: time.Now(), + } + + // Trigger cleanup if needed (memory leak protection) + s.cleanupIfNeeded() +} + +// GetRecordCount returns the number of records for a partition (for testing) +func (s *InMemoryOffsetStorage) GetRecordCount(namespace, topicName string, partition *schema_pb.Partition) int { + s.mu.RLock() + defer s.mu.RUnlock() + + // Use TopicPartitionKey to match GetHighestOffset + key := TopicPartitionKey(namespace, topicName, partition) + if offsets, exists := s.records[key]; exists { + count := 0 + for _, entry := range offsets { + if entry.exists { + count++ + } + } + return count + } + return 0 +} + +// Clear removes all data (for testing) +func (s *InMemoryOffsetStorage) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + + s.checkpoints = make(map[string]int64) + s.records = make(map[string]map[int64]*recordEntry) + s.lastCleanup = time.Now() +} + +// Reset removes all data (implements resettable interface for shutdown) +func (s *InMemoryOffsetStorage) Reset() error { + s.Clear() + return nil +} + +// cleanupIfNeeded performs memory leak protection cleanup +// This method assumes the caller already holds the write lock +func (s *InMemoryOffsetStorage) cleanupIfNeeded() { + now := time.Now() + + // Only cleanup if enough time has passed + if now.Sub(s.lastCleanup) < s.cleanupInterval { + return + } + + s.lastCleanup = now + cutoff := now.Add(-s.recordTTL) + + // Clean up expired records and enforce size limits + for partitionKey, offsets := range s.records { + // Remove expired records + for offset, entry := range offsets { + if entry.timestamp.Before(cutoff) { + delete(offsets, offset) + } + } + + // Enforce size limit per partition + if len(offsets) > s.maxRecordsPerPartition { + // Keep only the most recent records + type offsetTime struct { + offset int64 + time time.Time + } + + var entries []offsetTime + for offset, entry := range offsets { + entries = append(entries, offsetTime{offset: offset, time: entry.timestamp}) + } + + // Sort by timestamp (newest first) + for i := 0; i < len(entries)-1; i++ { + for j := i + 1; j < len(entries); j++ { + if entries[i].time.Before(entries[j].time) { + entries[i], entries[j] = entries[j], entries[i] + } + } + } + + // Keep only the newest maxRecordsPerPartition entries + newOffsets := make(map[int64]*recordEntry) + for i := 0; i < s.maxRecordsPerPartition && i < len(entries); i++ { + offset := entries[i].offset + newOffsets[offset] = offsets[offset] + } + + s.records[partitionKey] = newOffsets + } + + // Remove empty partition maps + if len(offsets) == 0 { + delete(s.records, partitionKey) + } + } +} + +// GetMemoryStats returns memory usage statistics for monitoring +func (s *InMemoryOffsetStorage) GetMemoryStats() map[string]interface{} { + s.mu.RLock() + defer s.mu.RUnlock() + + totalRecords := 0 + partitionCount := len(s.records) + + for _, offsets := range s.records { + totalRecords += len(offsets) + } + + return map[string]interface{}{ + "total_partitions": partitionCount, + "total_records": totalRecords, + "max_records_per_partition": s.maxRecordsPerPartition, + "record_ttl_hours": s.recordTTL.Hours(), + "last_cleanup": s.lastCleanup, + } +} diff --git a/weed/mq/offset/migration.go b/weed/mq/offset/migration.go new file mode 100644 index 000000000..106129206 --- /dev/null +++ b/weed/mq/offset/migration.go @@ -0,0 +1,302 @@ +package offset + +import ( + "database/sql" + "fmt" + "time" +) + +// MigrationVersion represents a database migration version +type MigrationVersion struct { + Version int + Description string + SQL string +} + +// GetMigrations returns all available migrations for offset storage +func GetMigrations() []MigrationVersion { + return []MigrationVersion{ + { + Version: 1, + Description: "Create initial offset storage tables", + SQL: ` + -- Partition offset checkpoints table + -- TODO: Add _index as computed column when supported by database + CREATE TABLE IF NOT EXISTS partition_offset_checkpoints ( + partition_key TEXT PRIMARY KEY, + ring_size INTEGER NOT NULL, + range_start INTEGER NOT NULL, + range_stop INTEGER NOT NULL, + unix_time_ns INTEGER NOT NULL, + checkpoint_offset INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + + -- Offset mappings table for detailed tracking + -- TODO: Add _index as computed column when supported by database + CREATE TABLE IF NOT EXISTS offset_mappings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + partition_key TEXT NOT NULL, + kafka_offset INTEGER NOT NULL, + smq_timestamp INTEGER NOT NULL, + message_size INTEGER NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE(partition_key, kafka_offset) + ); + + -- Schema migrations tracking table + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + description TEXT NOT NULL, + applied_at INTEGER NOT NULL + ); + `, + }, + { + Version: 2, + Description: "Add indexes for performance optimization", + SQL: ` + -- Indexes for performance + CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition + ON partition_offset_checkpoints(partition_key); + + CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset + ON offset_mappings(partition_key, kafka_offset); + + CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp + ON offset_mappings(partition_key, smq_timestamp); + + CREATE INDEX IF NOT EXISTS idx_offset_mappings_created_at + ON offset_mappings(created_at); + `, + }, + { + Version: 3, + Description: "Add partition metadata table for enhanced tracking", + SQL: ` + -- Partition metadata table + CREATE TABLE IF NOT EXISTS partition_metadata ( + partition_key TEXT PRIMARY KEY, + ring_size INTEGER NOT NULL, + range_start INTEGER NOT NULL, + range_stop INTEGER NOT NULL, + unix_time_ns INTEGER NOT NULL, + created_at INTEGER NOT NULL, + last_activity_at INTEGER NOT NULL, + record_count INTEGER DEFAULT 0, + total_size INTEGER DEFAULT 0 + ); + + -- Index for partition metadata + CREATE INDEX IF NOT EXISTS idx_partition_metadata_activity + ON partition_metadata(last_activity_at); + `, + }, + } +} + +// MigrationManager handles database schema migrations +type MigrationManager struct { + db *sql.DB +} + +// NewMigrationManager creates a new migration manager +func NewMigrationManager(db *sql.DB) *MigrationManager { + return &MigrationManager{db: db} +} + +// GetCurrentVersion returns the current schema version +func (m *MigrationManager) GetCurrentVersion() (int, error) { + // First, ensure the migrations table exists + _, err := m.db.Exec(` + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + description TEXT NOT NULL, + applied_at INTEGER NOT NULL + ) + `) + if err != nil { + return 0, fmt.Errorf("failed to create migrations table: %w", err) + } + + var version sql.NullInt64 + err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version) + if err != nil { + return 0, fmt.Errorf("failed to get current version: %w", err) + } + + if !version.Valid { + return 0, nil // No migrations applied yet + } + + return int(version.Int64), nil +} + +// ApplyMigrations applies all pending migrations +func (m *MigrationManager) ApplyMigrations() error { + currentVersion, err := m.GetCurrentVersion() + if err != nil { + return fmt.Errorf("failed to get current version: %w", err) + } + + migrations := GetMigrations() + + for _, migration := range migrations { + if migration.Version <= currentVersion { + continue // Already applied + } + + fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description) + + // Begin transaction + tx, err := m.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err) + } + + // Execute migration SQL + _, err = tx.Exec(migration.SQL) + if err != nil { + tx.Rollback() + return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err) + } + + // Record migration as applied + _, err = tx.Exec( + "INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)", + migration.Version, + migration.Description, + getCurrentTimestamp(), + ) + if err != nil { + tx.Rollback() + return fmt.Errorf("failed to record migration %d: %w", migration.Version, err) + } + + // Commit transaction + err = tx.Commit() + if err != nil { + return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err) + } + + fmt.Printf("Successfully applied migration %d\n", migration.Version) + } + + return nil +} + +// RollbackMigration rolls back a specific migration (if supported) +func (m *MigrationManager) RollbackMigration(version int) error { + // TODO: Implement rollback functionality + // ASSUMPTION: For now, rollbacks are not supported as they require careful planning + return fmt.Errorf("migration rollbacks not implemented - manual intervention required") +} + +// GetAppliedMigrations returns a list of all applied migrations +func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) { + rows, err := m.db.Query(` + SELECT version, description, applied_at + FROM schema_migrations + ORDER BY version + `) + if err != nil { + return nil, fmt.Errorf("failed to query applied migrations: %w", err) + } + defer rows.Close() + + var migrations []AppliedMigration + for rows.Next() { + var migration AppliedMigration + err := rows.Scan(&migration.Version, &migration.Description, &migration.AppliedAt) + if err != nil { + return nil, fmt.Errorf("failed to scan migration: %w", err) + } + migrations = append(migrations, migration) + } + + return migrations, nil +} + +// ValidateSchema validates that the database schema is up to date +func (m *MigrationManager) ValidateSchema() error { + currentVersion, err := m.GetCurrentVersion() + if err != nil { + return fmt.Errorf("failed to get current version: %w", err) + } + + migrations := GetMigrations() + if len(migrations) == 0 { + return nil + } + + latestVersion := migrations[len(migrations)-1].Version + if currentVersion < latestVersion { + return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion) + } + + return nil +} + +// AppliedMigration represents a migration that has been applied +type AppliedMigration struct { + Version int + Description string + AppliedAt int64 +} + +// getCurrentTimestamp returns the current timestamp in nanoseconds +func getCurrentTimestamp() int64 { + return time.Now().UnixNano() +} + +// CreateDatabase creates and initializes a new offset storage database +func CreateDatabase(dbPath string) (*sql.DB, error) { + // TODO: Support different database types (PostgreSQL, MySQL, etc.) + // ASSUMPTION: Using SQLite for now, can be extended for other databases + + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return nil, fmt.Errorf("failed to open database: %w", err) + } + + // Configure SQLite for better performance + pragmas := []string{ + "PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency + "PRAGMA synchronous=NORMAL", // Balance between safety and performance + "PRAGMA cache_size=10000", // Increase cache size + "PRAGMA foreign_keys=ON", // Enable foreign key constraints + "PRAGMA temp_store=MEMORY", // Store temporary tables in memory + } + + for _, pragma := range pragmas { + _, err := db.Exec(pragma) + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err) + } + } + + // Apply migrations + migrationManager := NewMigrationManager(db) + err = migrationManager.ApplyMigrations() + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to apply migrations: %w", err) + } + + return db, nil +} + +// BackupDatabase creates a backup of the offset storage database +func BackupDatabase(sourceDB *sql.DB, backupPath string) error { + // TODO: Implement database backup functionality + // ASSUMPTION: This would use database-specific backup mechanisms + return fmt.Errorf("database backup not implemented yet") +} + +// RestoreDatabase restores a database from a backup +func RestoreDatabase(backupPath, targetPath string) error { + // TODO: Implement database restore functionality + // ASSUMPTION: This would use database-specific restore mechanisms + return fmt.Errorf("database restore not implemented yet") +} diff --git a/weed/mq/offset/sql_storage.go b/weed/mq/offset/sql_storage.go new file mode 100644 index 000000000..c3107e5a4 --- /dev/null +++ b/weed/mq/offset/sql_storage.go @@ -0,0 +1,394 @@ +package offset + +import ( + "database/sql" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// OffsetEntry represents a mapping between Kafka offset and SMQ timestamp +type OffsetEntry struct { + KafkaOffset int64 + SMQTimestamp int64 + MessageSize int32 +} + +// SQLOffsetStorage implements OffsetStorage using SQL database with _index column +type SQLOffsetStorage struct { + db *sql.DB +} + +// NewSQLOffsetStorage creates a new SQL-based offset storage +func NewSQLOffsetStorage(db *sql.DB) (*SQLOffsetStorage, error) { + storage := &SQLOffsetStorage{db: db} + + // Initialize database schema + if err := storage.initializeSchema(); err != nil { + return nil, fmt.Errorf("failed to initialize schema: %w", err) + } + + return storage, nil +} + +// initializeSchema creates the necessary tables for offset storage +func (s *SQLOffsetStorage) initializeSchema() error { + // TODO: Create offset storage tables with _index as hidden column + // ASSUMPTION: Using SQLite-compatible syntax, may need adaptation for other databases + + queries := []string{ + // Partition offset checkpoints table + // TODO: Add _index as computed column when supported by database + // ASSUMPTION: Using regular columns for now, _index concept preserved for future enhancement + `CREATE TABLE IF NOT EXISTS partition_offset_checkpoints ( + partition_key TEXT PRIMARY KEY, + ring_size INTEGER NOT NULL, + range_start INTEGER NOT NULL, + range_stop INTEGER NOT NULL, + unix_time_ns INTEGER NOT NULL, + checkpoint_offset INTEGER NOT NULL, + updated_at INTEGER NOT NULL + )`, + + // Offset mappings table for detailed tracking + // TODO: Add _index as computed column when supported by database + `CREATE TABLE IF NOT EXISTS offset_mappings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + partition_key TEXT NOT NULL, + kafka_offset INTEGER NOT NULL, + smq_timestamp INTEGER NOT NULL, + message_size INTEGER NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE(partition_key, kafka_offset) + )`, + + // Indexes for performance + `CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition + ON partition_offset_checkpoints(partition_key)`, + + `CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset + ON offset_mappings(partition_key, kafka_offset)`, + + `CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp + ON offset_mappings(partition_key, smq_timestamp)`, + } + + for _, query := range queries { + if _, err := s.db.Exec(query); err != nil { + return fmt.Errorf("failed to execute schema query: %w", err) + } + } + + return nil +} + +// SaveCheckpoint saves the checkpoint for a partition +func (s *SQLOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error { + // Use TopicPartitionKey to ensure each topic has isolated checkpoint storage + partitionKey := TopicPartitionKey(namespace, topicName, partition) + now := time.Now().UnixNano() + + // TODO: Use UPSERT for better performance + // ASSUMPTION: SQLite REPLACE syntax, may need adaptation for other databases + query := ` + REPLACE INTO partition_offset_checkpoints + (partition_key, ring_size, range_start, range_stop, unix_time_ns, checkpoint_offset, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ` + + _, err := s.db.Exec(query, + partitionKey, + partition.RingSize, + partition.RangeStart, + partition.RangeStop, + partition.UnixTimeNs, + offset, + now, + ) + + if err != nil { + return fmt.Errorf("failed to save checkpoint: %w", err) + } + + return nil +} + +// LoadCheckpoint loads the checkpoint for a partition +func (s *SQLOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + // Use TopicPartitionKey to match SaveCheckpoint + partitionKey := TopicPartitionKey(namespace, topicName, partition) + + query := ` + SELECT checkpoint_offset + FROM partition_offset_checkpoints + WHERE partition_key = ? + ` + + var checkpointOffset int64 + err := s.db.QueryRow(query, partitionKey).Scan(&checkpointOffset) + + if err == sql.ErrNoRows { + return -1, fmt.Errorf("no checkpoint found") + } + + if err != nil { + return -1, fmt.Errorf("failed to load checkpoint: %w", err) + } + + return checkpointOffset, nil +} + +// GetHighestOffset finds the highest offset in storage for a partition +func (s *SQLOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + // Use TopicPartitionKey to match SaveCheckpoint + partitionKey := TopicPartitionKey(namespace, topicName, partition) + + // TODO: Use _index column for efficient querying + // ASSUMPTION: kafka_offset represents the sequential offset we're tracking + query := ` + SELECT MAX(kafka_offset) + FROM offset_mappings + WHERE partition_key = ? + ` + + var highestOffset sql.NullInt64 + err := s.db.QueryRow(query, partitionKey).Scan(&highestOffset) + + if err != nil { + return -1, fmt.Errorf("failed to get highest offset: %w", err) + } + + if !highestOffset.Valid { + return -1, fmt.Errorf("no records found") + } + + return highestOffset.Int64, nil +} + +// SaveOffsetMapping stores an offset mapping (extends OffsetStorage interface) +func (s *SQLOffsetStorage) SaveOffsetMapping(partitionKey string, kafkaOffset, smqTimestamp int64, size int32) error { + now := time.Now().UnixNano() + + // TODO: Handle duplicate key conflicts gracefully + // ASSUMPTION: Using INSERT OR REPLACE for conflict resolution + query := ` + INSERT OR REPLACE INTO offset_mappings + (partition_key, kafka_offset, smq_timestamp, message_size, created_at) + VALUES (?, ?, ?, ?, ?) + ` + + _, err := s.db.Exec(query, partitionKey, kafkaOffset, smqTimestamp, size, now) + if err != nil { + return fmt.Errorf("failed to save offset mapping: %w", err) + } + + return nil +} + +// LoadOffsetMappings retrieves all offset mappings for a partition +func (s *SQLOffsetStorage) LoadOffsetMappings(partitionKey string) ([]OffsetEntry, error) { + // TODO: Add pagination for large result sets + // ASSUMPTION: Loading all mappings for now, should be paginated in production + query := ` + SELECT kafka_offset, smq_timestamp, message_size + FROM offset_mappings + WHERE partition_key = ? + ORDER BY kafka_offset ASC + ` + + rows, err := s.db.Query(query, partitionKey) + if err != nil { + return nil, fmt.Errorf("failed to query offset mappings: %w", err) + } + defer rows.Close() + + var entries []OffsetEntry + for rows.Next() { + var entry OffsetEntry + err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize) + if err != nil { + return nil, fmt.Errorf("failed to scan offset entry: %w", err) + } + entries = append(entries, entry) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating offset mappings: %w", err) + } + + return entries, nil +} + +// GetOffsetMappingsByRange retrieves offset mappings within a specific range +func (s *SQLOffsetStorage) GetOffsetMappingsByRange(partitionKey string, startOffset, endOffset int64) ([]OffsetEntry, error) { + // TODO: Use _index column for efficient range queries + query := ` + SELECT kafka_offset, smq_timestamp, message_size + FROM offset_mappings + WHERE partition_key = ? AND kafka_offset >= ? AND kafka_offset <= ? + ORDER BY kafka_offset ASC + ` + + rows, err := s.db.Query(query, partitionKey, startOffset, endOffset) + if err != nil { + return nil, fmt.Errorf("failed to query offset range: %w", err) + } + defer rows.Close() + + var entries []OffsetEntry + for rows.Next() { + var entry OffsetEntry + err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize) + if err != nil { + return nil, fmt.Errorf("failed to scan offset entry: %w", err) + } + entries = append(entries, entry) + } + + return entries, nil +} + +// GetPartitionStats returns statistics about a partition's offset usage +func (s *SQLOffsetStorage) GetPartitionStats(partitionKey string) (*PartitionStats, error) { + query := ` + SELECT + COUNT(*) as record_count, + MIN(kafka_offset) as earliest_offset, + MAX(kafka_offset) as latest_offset, + SUM(message_size) as total_size, + MIN(created_at) as first_record_time, + MAX(created_at) as last_record_time + FROM offset_mappings + WHERE partition_key = ? + ` + + var stats PartitionStats + var earliestOffset, latestOffset sql.NullInt64 + var totalSize sql.NullInt64 + var firstRecordTime, lastRecordTime sql.NullInt64 + + err := s.db.QueryRow(query, partitionKey).Scan( + &stats.RecordCount, + &earliestOffset, + &latestOffset, + &totalSize, + &firstRecordTime, + &lastRecordTime, + ) + + if err != nil { + return nil, fmt.Errorf("failed to get partition stats: %w", err) + } + + stats.PartitionKey = partitionKey + + if earliestOffset.Valid { + stats.EarliestOffset = earliestOffset.Int64 + } else { + stats.EarliestOffset = -1 + } + + if latestOffset.Valid { + stats.LatestOffset = latestOffset.Int64 + stats.HighWaterMark = latestOffset.Int64 + 1 + } else { + stats.LatestOffset = -1 + stats.HighWaterMark = 0 + } + + if firstRecordTime.Valid { + stats.FirstRecordTime = firstRecordTime.Int64 + } + + if lastRecordTime.Valid { + stats.LastRecordTime = lastRecordTime.Int64 + } + + if totalSize.Valid { + stats.TotalSize = totalSize.Int64 + } + + return &stats, nil +} + +// CleanupOldMappings removes offset mappings older than the specified time +func (s *SQLOffsetStorage) CleanupOldMappings(olderThanNs int64) error { + // TODO: Add configurable cleanup policies + // ASSUMPTION: Simple time-based cleanup, could be enhanced with retention policies + query := ` + DELETE FROM offset_mappings + WHERE created_at < ? + ` + + result, err := s.db.Exec(query, olderThanNs) + if err != nil { + return fmt.Errorf("failed to cleanup old mappings: %w", err) + } + + rowsAffected, _ := result.RowsAffected() + if rowsAffected > 0 { + // Log cleanup activity + fmt.Printf("Cleaned up %d old offset mappings\n", rowsAffected) + } + + return nil +} + +// Close closes the database connection +func (s *SQLOffsetStorage) Close() error { + if s.db != nil { + return s.db.Close() + } + return nil +} + +// PartitionStats provides statistics about a partition's offset usage +type PartitionStats struct { + PartitionKey string + RecordCount int64 + EarliestOffset int64 + LatestOffset int64 + HighWaterMark int64 + TotalSize int64 + FirstRecordTime int64 + LastRecordTime int64 +} + +// GetAllPartitions returns a list of all partitions with offset data +func (s *SQLOffsetStorage) GetAllPartitions() ([]string, error) { + query := ` + SELECT DISTINCT partition_key + FROM offset_mappings + ORDER BY partition_key + ` + + rows, err := s.db.Query(query) + if err != nil { + return nil, fmt.Errorf("failed to get all partitions: %w", err) + } + defer rows.Close() + + var partitions []string + for rows.Next() { + var partitionKey string + if err := rows.Scan(&partitionKey); err != nil { + return nil, fmt.Errorf("failed to scan partition key: %w", err) + } + partitions = append(partitions, partitionKey) + } + + return partitions, nil +} + +// Vacuum performs database maintenance operations +func (s *SQLOffsetStorage) Vacuum() error { + // TODO: Add database-specific optimization commands + // ASSUMPTION: SQLite VACUUM command, may need adaptation for other databases + _, err := s.db.Exec("VACUUM") + if err != nil { + return fmt.Errorf("failed to vacuum database: %w", err) + } + + return nil +} diff --git a/weed/mq/offset/sql_storage_test.go b/weed/mq/offset/sql_storage_test.go new file mode 100644 index 000000000..661f317de --- /dev/null +++ b/weed/mq/offset/sql_storage_test.go @@ -0,0 +1,516 @@ +package offset + +import ( + "database/sql" + "os" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" // SQLite driver + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func createTestDB(t *testing.T) *sql.DB { + // Create temporary database file + tmpFile, err := os.CreateTemp("", "offset_test_*.db") + if err != nil { + t.Fatalf("Failed to create temp database file: %v", err) + } + tmpFile.Close() + + // Clean up the file when test completes + t.Cleanup(func() { + os.Remove(tmpFile.Name()) + }) + + db, err := sql.Open("sqlite3", tmpFile.Name()) + if err != nil { + t.Fatalf("Failed to open database: %v", err) + } + + t.Cleanup(func() { + db.Close() + }) + + return db +} + +func createTestPartitionForSQL() *schema_pb.Partition { + return &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: time.Now().UnixNano(), + } +} + +func TestSQLOffsetStorage_InitializeSchema(t *testing.T) { + db := createTestDB(t) + + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + // Verify tables were created + tables := []string{ + "partition_offset_checkpoints", + "offset_mappings", + } + + for _, table := range tables { + var count int + err := db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?", table).Scan(&count) + if err != nil { + t.Fatalf("Failed to check table %s: %v", table, err) + } + + if count != 1 { + t.Errorf("Table %s was not created", table) + } + } +} + +func TestSQLOffsetStorage_SaveLoadCheckpoint(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + + // Test saving checkpoint + err = storage.SaveCheckpoint("test-namespace", "test-topic", partition, 100) + if err != nil { + t.Fatalf("Failed to save checkpoint: %v", err) + } + + // Test loading checkpoint + checkpoint, err := storage.LoadCheckpoint("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to load checkpoint: %v", err) + } + + if checkpoint != 100 { + t.Errorf("Expected checkpoint 100, got %d", checkpoint) + } + + // Test updating checkpoint + err = storage.SaveCheckpoint("test-namespace", "test-topic", partition, 200) + if err != nil { + t.Fatalf("Failed to update checkpoint: %v", err) + } + + checkpoint, err = storage.LoadCheckpoint("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to load updated checkpoint: %v", err) + } + + if checkpoint != 200 { + t.Errorf("Expected updated checkpoint 200, got %d", checkpoint) + } +} + +func TestSQLOffsetStorage_LoadCheckpointNotFound(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + + // Test loading non-existent checkpoint + _, err = storage.LoadCheckpoint("test-namespace", "test-topic", partition) + if err == nil { + t.Error("Expected error for non-existent checkpoint") + } +} + +func TestSQLOffsetStorage_SaveLoadOffsetMappings(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + partitionKey := partitionKey(partition) + + // Save multiple offset mappings + mappings := []struct { + offset int64 + timestamp int64 + size int32 + }{ + {0, 1000, 100}, + {1, 2000, 150}, + {2, 3000, 200}, + } + + for _, mapping := range mappings { + err := storage.SaveOffsetMapping(partitionKey, mapping.offset, mapping.timestamp, mapping.size) + if err != nil { + t.Fatalf("Failed to save offset mapping: %v", err) + } + } + + // Load offset mappings + entries, err := storage.LoadOffsetMappings(partitionKey) + if err != nil { + t.Fatalf("Failed to load offset mappings: %v", err) + } + + if len(entries) != len(mappings) { + t.Errorf("Expected %d entries, got %d", len(mappings), len(entries)) + } + + // Verify entries are sorted by offset + for i, entry := range entries { + expected := mappings[i] + if entry.KafkaOffset != expected.offset { + t.Errorf("Entry %d: expected offset %d, got %d", i, expected.offset, entry.KafkaOffset) + } + if entry.SMQTimestamp != expected.timestamp { + t.Errorf("Entry %d: expected timestamp %d, got %d", i, expected.timestamp, entry.SMQTimestamp) + } + if entry.MessageSize != expected.size { + t.Errorf("Entry %d: expected size %d, got %d", i, expected.size, entry.MessageSize) + } + } +} + +func TestSQLOffsetStorage_GetHighestOffset(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + partitionKey := TopicPartitionKey("test-namespace", "test-topic", partition) + + // Test empty partition + _, err = storage.GetHighestOffset("test-namespace", "test-topic", partition) + if err == nil { + t.Error("Expected error for empty partition") + } + + // Add some offset mappings + offsets := []int64{5, 1, 3, 2, 4} + for _, offset := range offsets { + err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100) + if err != nil { + t.Fatalf("Failed to save offset mapping: %v", err) + } + } + + // Get highest offset + highest, err := storage.GetHighestOffset("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get highest offset: %v", err) + } + + if highest != 5 { + t.Errorf("Expected highest offset 5, got %d", highest) + } +} + +func TestSQLOffsetStorage_GetOffsetMappingsByRange(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + partitionKey := partitionKey(partition) + + // Add offset mappings + for i := int64(0); i < 10; i++ { + err := storage.SaveOffsetMapping(partitionKey, i, i*1000, 100) + if err != nil { + t.Fatalf("Failed to save offset mapping: %v", err) + } + } + + // Get range of offsets + entries, err := storage.GetOffsetMappingsByRange(partitionKey, 3, 7) + if err != nil { + t.Fatalf("Failed to get offset range: %v", err) + } + + expectedCount := 5 // offsets 3, 4, 5, 6, 7 + if len(entries) != expectedCount { + t.Errorf("Expected %d entries, got %d", expectedCount, len(entries)) + } + + // Verify range + for i, entry := range entries { + expectedOffset := int64(3 + i) + if entry.KafkaOffset != expectedOffset { + t.Errorf("Entry %d: expected offset %d, got %d", i, expectedOffset, entry.KafkaOffset) + } + } +} + +func TestSQLOffsetStorage_GetPartitionStats(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + partitionKey := partitionKey(partition) + + // Test empty partition stats + stats, err := storage.GetPartitionStats(partitionKey) + if err != nil { + t.Fatalf("Failed to get empty partition stats: %v", err) + } + + if stats.RecordCount != 0 { + t.Errorf("Expected record count 0, got %d", stats.RecordCount) + } + + if stats.EarliestOffset != -1 { + t.Errorf("Expected earliest offset -1, got %d", stats.EarliestOffset) + } + + // Add some data + sizes := []int32{100, 150, 200} + for i, size := range sizes { + err := storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), size) + if err != nil { + t.Fatalf("Failed to save offset mapping: %v", err) + } + } + + // Get stats with data + stats, err = storage.GetPartitionStats(partitionKey) + if err != nil { + t.Fatalf("Failed to get partition stats: %v", err) + } + + if stats.RecordCount != 3 { + t.Errorf("Expected record count 3, got %d", stats.RecordCount) + } + + if stats.EarliestOffset != 0 { + t.Errorf("Expected earliest offset 0, got %d", stats.EarliestOffset) + } + + if stats.LatestOffset != 2 { + t.Errorf("Expected latest offset 2, got %d", stats.LatestOffset) + } + + if stats.HighWaterMark != 3 { + t.Errorf("Expected high water mark 3, got %d", stats.HighWaterMark) + } + + expectedTotalSize := int64(100 + 150 + 200) + if stats.TotalSize != expectedTotalSize { + t.Errorf("Expected total size %d, got %d", expectedTotalSize, stats.TotalSize) + } +} + +func TestSQLOffsetStorage_GetAllPartitions(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + // Test empty database + partitions, err := storage.GetAllPartitions() + if err != nil { + t.Fatalf("Failed to get all partitions: %v", err) + } + + if len(partitions) != 0 { + t.Errorf("Expected 0 partitions, got %d", len(partitions)) + } + + // Add data for multiple partitions + partition1 := createTestPartitionForSQL() + partition2 := &schema_pb.Partition{ + RingSize: 1024, + RangeStart: 32, + RangeStop: 63, + UnixTimeNs: time.Now().UnixNano(), + } + + partitionKey1 := partitionKey(partition1) + partitionKey2 := partitionKey(partition2) + + storage.SaveOffsetMapping(partitionKey1, 0, 1000, 100) + storage.SaveOffsetMapping(partitionKey2, 0, 2000, 150) + + // Get all partitions + partitions, err = storage.GetAllPartitions() + if err != nil { + t.Fatalf("Failed to get all partitions: %v", err) + } + + if len(partitions) != 2 { + t.Errorf("Expected 2 partitions, got %d", len(partitions)) + } + + // Verify partition keys are present + partitionMap := make(map[string]bool) + for _, p := range partitions { + partitionMap[p] = true + } + + if !partitionMap[partitionKey1] { + t.Errorf("Partition key %s not found", partitionKey1) + } + + if !partitionMap[partitionKey2] { + t.Errorf("Partition key %s not found", partitionKey2) + } +} + +func TestSQLOffsetStorage_CleanupOldMappings(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + partitionKey := partitionKey(partition) + + // Add mappings with different timestamps + now := time.Now().UnixNano() + + // Add old mapping by directly inserting with old timestamp + oldTime := now - (24 * time.Hour).Nanoseconds() // 24 hours ago + _, err = db.Exec(` + INSERT INTO offset_mappings + (partition_key, kafka_offset, smq_timestamp, message_size, created_at) + VALUES (?, ?, ?, ?, ?) + `, partitionKey, 0, oldTime, 100, oldTime) + if err != nil { + t.Fatalf("Failed to insert old mapping: %v", err) + } + + // Add recent mapping + storage.SaveOffsetMapping(partitionKey, 1, now, 150) + + // Verify both mappings exist + entries, err := storage.LoadOffsetMappings(partitionKey) + if err != nil { + t.Fatalf("Failed to load mappings: %v", err) + } + + if len(entries) != 2 { + t.Errorf("Expected 2 mappings before cleanup, got %d", len(entries)) + } + + // Cleanup old mappings (older than 12 hours) + cutoffTime := now - (12 * time.Hour).Nanoseconds() + err = storage.CleanupOldMappings(cutoffTime) + if err != nil { + t.Fatalf("Failed to cleanup old mappings: %v", err) + } + + // Verify only recent mapping remains + entries, err = storage.LoadOffsetMappings(partitionKey) + if err != nil { + t.Fatalf("Failed to load mappings after cleanup: %v", err) + } + + if len(entries) != 1 { + t.Errorf("Expected 1 mapping after cleanup, got %d", len(entries)) + } + + if entries[0].KafkaOffset != 1 { + t.Errorf("Expected remaining mapping offset 1, got %d", entries[0].KafkaOffset) + } +} + +func TestSQLOffsetStorage_Vacuum(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + // Vacuum should not fail on empty database + err = storage.Vacuum() + if err != nil { + t.Fatalf("Failed to vacuum database: %v", err) + } + + // Add some data and vacuum again + partition := createTestPartitionForSQL() + partitionKey := partitionKey(partition) + storage.SaveOffsetMapping(partitionKey, 0, 1000, 100) + + err = storage.Vacuum() + if err != nil { + t.Fatalf("Failed to vacuum database with data: %v", err) + } +} + +func TestSQLOffsetStorage_ConcurrentAccess(t *testing.T) { + db := createTestDB(t) + storage, err := NewSQLOffsetStorage(db) + if err != nil { + t.Fatalf("Failed to create SQL storage: %v", err) + } + defer storage.Close() + + partition := createTestPartitionForSQL() + partitionKey := partitionKey(partition) + + // Test concurrent writes + const numGoroutines = 10 + const offsetsPerGoroutine = 10 + + done := make(chan bool, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(goroutineID int) { + defer func() { done <- true }() + + for j := 0; j < offsetsPerGoroutine; j++ { + offset := int64(goroutineID*offsetsPerGoroutine + j) + err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100) + if err != nil { + t.Errorf("Failed to save offset mapping %d: %v", offset, err) + return + } + } + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < numGoroutines; i++ { + <-done + } + + // Verify all mappings were saved + entries, err := storage.LoadOffsetMappings(partitionKey) + if err != nil { + t.Fatalf("Failed to load mappings: %v", err) + } + + expectedCount := numGoroutines * offsetsPerGoroutine + if len(entries) != expectedCount { + t.Errorf("Expected %d mappings, got %d", expectedCount, len(entries)) + } +} diff --git a/weed/mq/offset/storage.go b/weed/mq/offset/storage.go new file mode 100644 index 000000000..b3eaddd6b --- /dev/null +++ b/weed/mq/offset/storage.go @@ -0,0 +1,5 @@ +package offset + +// Note: OffsetStorage interface is defined in manager.go +// Production implementations: FilerOffsetStorage (filer_storage.go), SQLOffsetStorage (sql_storage.go) +// Test implementation: InMemoryOffsetStorage (storage_test.go) diff --git a/weed/mq/offset/subscriber.go b/weed/mq/offset/subscriber.go new file mode 100644 index 000000000..d39932aae --- /dev/null +++ b/weed/mq/offset/subscriber.go @@ -0,0 +1,355 @@ +package offset + +import ( + "fmt" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// OffsetSubscriber handles offset-based subscription logic +type OffsetSubscriber struct { + mu sync.RWMutex + offsetRegistry *PartitionOffsetRegistry + subscriptions map[string]*OffsetSubscription +} + +// OffsetSubscription represents an active offset-based subscription +type OffsetSubscription struct { + ID string + Namespace string + TopicName string + Partition *schema_pb.Partition + StartOffset int64 + CurrentOffset int64 + OffsetType schema_pb.OffsetType + IsActive bool + offsetRegistry *PartitionOffsetRegistry +} + +// NewOffsetSubscriber creates a new offset-based subscriber +func NewOffsetSubscriber(offsetRegistry *PartitionOffsetRegistry) *OffsetSubscriber { + return &OffsetSubscriber{ + offsetRegistry: offsetRegistry, + subscriptions: make(map[string]*OffsetSubscription), + } +} + +// CreateSubscription creates a new offset-based subscription +func (s *OffsetSubscriber) CreateSubscription( + subscriptionID string, + namespace, topicName string, + partition *schema_pb.Partition, + offsetType schema_pb.OffsetType, + startOffset int64, +) (*OffsetSubscription, error) { + + s.mu.Lock() + defer s.mu.Unlock() + + // Check if subscription already exists + if _, exists := s.subscriptions[subscriptionID]; exists { + return nil, fmt.Errorf("subscription %s already exists", subscriptionID) + } + + // Resolve the actual start offset based on type + actualStartOffset, err := s.resolveStartOffset(namespace, topicName, partition, offsetType, startOffset) + if err != nil { + return nil, fmt.Errorf("failed to resolve start offset: %w", err) + } + + subscription := &OffsetSubscription{ + ID: subscriptionID, + Namespace: namespace, + TopicName: topicName, + Partition: partition, + StartOffset: actualStartOffset, + CurrentOffset: actualStartOffset, + OffsetType: offsetType, + IsActive: true, + offsetRegistry: s.offsetRegistry, + } + + s.subscriptions[subscriptionID] = subscription + return subscription, nil +} + +// GetSubscription retrieves an existing subscription +func (s *OffsetSubscriber) GetSubscription(subscriptionID string) (*OffsetSubscription, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + subscription, exists := s.subscriptions[subscriptionID] + if !exists { + return nil, fmt.Errorf("subscription %s not found", subscriptionID) + } + + return subscription, nil +} + +// CloseSubscription closes and removes a subscription +func (s *OffsetSubscriber) CloseSubscription(subscriptionID string) error { + s.mu.Lock() + defer s.mu.Unlock() + + subscription, exists := s.subscriptions[subscriptionID] + if !exists { + return fmt.Errorf("subscription %s not found", subscriptionID) + } + + subscription.IsActive = false + delete(s.subscriptions, subscriptionID) + return nil +} + +// resolveStartOffset resolves the actual start offset based on OffsetType +func (s *OffsetSubscriber) resolveStartOffset( + namespace, topicName string, + partition *schema_pb.Partition, + offsetType schema_pb.OffsetType, + requestedOffset int64, +) (int64, error) { + + switch offsetType { + case schema_pb.OffsetType_EXACT_OFFSET: + // Validate that the requested offset exists + return s.validateAndGetOffset(namespace, topicName, partition, requestedOffset) + + case schema_pb.OffsetType_RESET_TO_OFFSET: + // Use the requested offset, even if it doesn't exist yet + return requestedOffset, nil + + case schema_pb.OffsetType_RESET_TO_EARLIEST: + // Start from offset 0 + return 0, nil + + case schema_pb.OffsetType_RESET_TO_LATEST: + // Start from the current high water mark + hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition) + if err != nil { + return 0, err + } + return hwm, nil + + case schema_pb.OffsetType_RESUME_OR_EARLIEST: + // Try to resume from a saved position, fallback to earliest + // For now, just use earliest (consumer group position tracking will be added later) + return 0, nil + + case schema_pb.OffsetType_RESUME_OR_LATEST: + // Try to resume from a saved position, fallback to latest + // For now, just use latest + hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition) + if err != nil { + return 0, err + } + return hwm, nil + + default: + return 0, fmt.Errorf("unsupported offset type: %v", offsetType) + } +} + +// validateAndGetOffset validates that an offset exists and returns it +func (s *OffsetSubscriber) validateAndGetOffset(namespace, topicName string, partition *schema_pb.Partition, offset int64) (int64, error) { + if offset < 0 { + return 0, fmt.Errorf("offset cannot be negative: %d", offset) + } + + // Get the current high water mark + hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition) + if err != nil { + return 0, fmt.Errorf("failed to get high water mark: %w", err) + } + + // Check if offset is within valid range + if offset >= hwm { + return 0, fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm) + } + + return offset, nil +} + +// SeekToOffset seeks a subscription to a specific offset +func (sub *OffsetSubscription) SeekToOffset(offset int64) error { + if !sub.IsActive { + return fmt.Errorf("subscription is not active") + } + + // Validate the offset + if offset < 0 { + return fmt.Errorf("offset cannot be negative: %d", offset) + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition) + if err != nil { + return fmt.Errorf("failed to get high water mark: %w", err) + } + + if offset > hwm { + return fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm) + } + + sub.CurrentOffset = offset + return nil +} + +// GetNextOffset returns the next offset to read +func (sub *OffsetSubscription) GetNextOffset() int64 { + return sub.CurrentOffset +} + +// AdvanceOffset advances the subscription to the next offset +func (sub *OffsetSubscription) AdvanceOffset() { + sub.CurrentOffset++ +} + +// GetLag returns the lag between current position and high water mark +func (sub *OffsetSubscription) GetLag() (int64, error) { + if !sub.IsActive { + return 0, fmt.Errorf("subscription is not active") + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition) + if err != nil { + return 0, fmt.Errorf("failed to get high water mark: %w", err) + } + + lag := hwm - sub.CurrentOffset + if lag < 0 { + lag = 0 + } + + return lag, nil +} + +// IsAtEnd checks if the subscription has reached the end of available data +func (sub *OffsetSubscription) IsAtEnd() (bool, error) { + if !sub.IsActive { + return true, fmt.Errorf("subscription is not active") + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition) + if err != nil { + return false, fmt.Errorf("failed to get high water mark: %w", err) + } + + return sub.CurrentOffset >= hwm, nil +} + +// OffsetRange represents a range of offsets +type OffsetRange struct { + StartOffset int64 + EndOffset int64 + Count int64 +} + +// GetOffsetRange returns a range of offsets for batch reading +func (sub *OffsetSubscription) GetOffsetRange(maxCount int64) (*OffsetRange, error) { + if !sub.IsActive { + return nil, fmt.Errorf("subscription is not active") + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + startOffset := sub.CurrentOffset + endOffset := startOffset + maxCount - 1 + + // Don't go beyond high water mark + if endOffset >= hwm { + endOffset = hwm - 1 + } + + // If start is already at or beyond HWM, return empty range + if startOffset >= hwm { + return &OffsetRange{ + StartOffset: startOffset, + EndOffset: startOffset - 1, // Empty range + Count: 0, + }, nil + } + + count := endOffset - startOffset + 1 + return &OffsetRange{ + StartOffset: startOffset, + EndOffset: endOffset, + Count: count, + }, nil +} + +// AdvanceOffsetBy advances the subscription by a specific number of offsets +func (sub *OffsetSubscription) AdvanceOffsetBy(count int64) { + sub.CurrentOffset += count +} + +// OffsetSeeker provides utilities for offset-based seeking +type OffsetSeeker struct { + offsetRegistry *PartitionOffsetRegistry +} + +// NewOffsetSeeker creates a new offset seeker +func NewOffsetSeeker(offsetRegistry *PartitionOffsetRegistry) *OffsetSeeker { + return &OffsetSeeker{ + offsetRegistry: offsetRegistry, + } +} + +// SeekToTimestamp finds the offset closest to a given timestamp +// This bridges offset-based and timestamp-based seeking +func (seeker *OffsetSeeker) SeekToTimestamp(partition *schema_pb.Partition, timestamp int64) (int64, error) { + // TODO: This requires integration with the storage layer to map timestamps to offsets + // For now, return an error indicating this feature needs implementation + return 0, fmt.Errorf("timestamp-to-offset mapping not implemented yet") +} + +// ValidateOffsetRange validates that an offset range is valid +func (seeker *OffsetSeeker) ValidateOffsetRange(namespace, topicName string, partition *schema_pb.Partition, startOffset, endOffset int64) error { + if startOffset < 0 { + return fmt.Errorf("start offset cannot be negative: %d", startOffset) + } + + if endOffset < startOffset { + return fmt.Errorf("end offset %d cannot be less than start offset %d", endOffset, startOffset) + } + + hwm, err := seeker.offsetRegistry.GetHighWaterMark(namespace, topicName, partition) + if err != nil { + return fmt.Errorf("failed to get high water mark: %w", err) + } + + if startOffset >= hwm { + return fmt.Errorf("start offset %d is beyond high water mark %d", startOffset, hwm) + } + + if endOffset >= hwm { + return fmt.Errorf("end offset %d is beyond high water mark %d", endOffset, hwm) + } + + return nil +} + +// GetAvailableOffsetRange returns the range of available offsets for a partition +func (seeker *OffsetSeeker) GetAvailableOffsetRange(namespace, topicName string, partition *schema_pb.Partition) (*OffsetRange, error) { + hwm, err := seeker.offsetRegistry.GetHighWaterMark(namespace, topicName, partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + if hwm == 0 { + // No data available + return &OffsetRange{ + StartOffset: 0, + EndOffset: -1, + Count: 0, + }, nil + } + + return &OffsetRange{ + StartOffset: 0, + EndOffset: hwm - 1, + Count: hwm, + }, nil +} diff --git a/weed/mq/offset/subscriber_test.go b/weed/mq/offset/subscriber_test.go new file mode 100644 index 000000000..1ab97dadc --- /dev/null +++ b/weed/mq/offset/subscriber_test.go @@ -0,0 +1,457 @@ +package offset + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestOffsetSubscriber_CreateSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign some offsets first + registry.AssignOffsets("test-namespace", "test-topic", partition, 10) + + // Test EXACT_OFFSET subscription + sub, err := subscriber.CreateSubscription("test-sub-1", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 5) + if err != nil { + t.Fatalf("Failed to create EXACT_OFFSET subscription: %v", err) + } + + if sub.StartOffset != 5 { + t.Errorf("Expected start offset 5, got %d", sub.StartOffset) + } + if sub.CurrentOffset != 5 { + t.Errorf("Expected current offset 5, got %d", sub.CurrentOffset) + } + + // Test RESET_TO_LATEST subscription + sub2, err := subscriber.CreateSubscription("test-sub-2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0) + if err != nil { + t.Fatalf("Failed to create RESET_TO_LATEST subscription: %v", err) + } + + if sub2.StartOffset != 10 { // Should be at high water mark + t.Errorf("Expected start offset 10, got %d", sub2.StartOffset) + } +} + +func TestOffsetSubscriber_InvalidSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign some offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 5) + + // Test invalid offset (beyond high water mark) + _, err := subscriber.CreateSubscription("invalid-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 10) + if err == nil { + t.Error("Expected error for offset beyond high water mark") + } + + // Test negative offset + _, err = subscriber.CreateSubscription("invalid-sub-2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, -1) + if err == nil { + t.Error("Expected error for negative offset") + } +} + +func TestOffsetSubscriber_DuplicateSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create first subscription + _, err := subscriber.CreateSubscription("duplicate-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create first subscription: %v", err) + } + + // Try to create duplicate + _, err = subscriber.CreateSubscription("duplicate-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err == nil { + t.Error("Expected error for duplicate subscription ID") + } +} + +func TestOffsetSubscription_SeekToOffset(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 20) + + // Create subscription + sub, err := subscriber.CreateSubscription("seek-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Test valid seek + err = sub.SeekToOffset(10) + if err != nil { + t.Fatalf("Failed to seek to offset 10: %v", err) + } + + if sub.CurrentOffset != 10 { + t.Errorf("Expected current offset 10, got %d", sub.CurrentOffset) + } + + // Test invalid seek (beyond high water mark) + err = sub.SeekToOffset(25) + if err == nil { + t.Error("Expected error for seek beyond high water mark") + } + + // Test negative seek + err = sub.SeekToOffset(-1) + if err == nil { + t.Error("Expected error for negative seek offset") + } +} + +func TestOffsetSubscription_AdvanceOffset(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create subscription + sub, err := subscriber.CreateSubscription("advance-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Test single advance + initialOffset := sub.GetNextOffset() + sub.AdvanceOffset() + + if sub.GetNextOffset() != initialOffset+1 { + t.Errorf("Expected offset %d, got %d", initialOffset+1, sub.GetNextOffset()) + } + + // Test batch advance + sub.AdvanceOffsetBy(5) + + if sub.GetNextOffset() != initialOffset+6 { + t.Errorf("Expected offset %d, got %d", initialOffset+6, sub.GetNextOffset()) + } +} + +func TestOffsetSubscription_GetLag(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 15) + + // Create subscription at offset 5 + sub, err := subscriber.CreateSubscription("lag-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 5) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Check initial lag + lag, err := sub.GetLag() + if err != nil { + t.Fatalf("Failed to get lag: %v", err) + } + + expectedLag := int64(15 - 5) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d, got %d", expectedLag, lag) + } + + // Advance and check lag again + sub.AdvanceOffsetBy(3) + + lag, err = sub.GetLag() + if err != nil { + t.Fatalf("Failed to get lag after advance: %v", err) + } + + expectedLag = int64(15 - 8) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag) + } +} + +func TestOffsetSubscription_IsAtEnd(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 10) + + // Create subscription at end + sub, err := subscriber.CreateSubscription("end-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Should be at end + atEnd, err := sub.IsAtEnd() + if err != nil { + t.Fatalf("Failed to check if at end: %v", err) + } + + if !atEnd { + t.Error("Expected subscription to be at end") + } + + // Seek to middle and check again + sub.SeekToOffset(5) + + atEnd, err = sub.IsAtEnd() + if err != nil { + t.Fatalf("Failed to check if at end after seek: %v", err) + } + + if atEnd { + t.Error("Expected subscription not to be at end after seek") + } +} + +func TestOffsetSubscription_GetOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 20) + + // Create subscription + sub, err := subscriber.CreateSubscription("range-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 5) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Test normal range + offsetRange, err := sub.GetOffsetRange(10) + if err != nil { + t.Fatalf("Failed to get offset range: %v", err) + } + + if offsetRange.StartOffset != 5 { + t.Errorf("Expected start offset 5, got %d", offsetRange.StartOffset) + } + if offsetRange.EndOffset != 14 { + t.Errorf("Expected end offset 14, got %d", offsetRange.EndOffset) + } + if offsetRange.Count != 10 { + t.Errorf("Expected count 10, got %d", offsetRange.Count) + } + + // Test range that exceeds high water mark + sub.SeekToOffset(15) + offsetRange, err = sub.GetOffsetRange(10) + if err != nil { + t.Fatalf("Failed to get offset range near end: %v", err) + } + + if offsetRange.StartOffset != 15 { + t.Errorf("Expected start offset 15, got %d", offsetRange.StartOffset) + } + if offsetRange.EndOffset != 19 { // Should be capped at hwm-1 + t.Errorf("Expected end offset 19, got %d", offsetRange.EndOffset) + } + if offsetRange.Count != 5 { + t.Errorf("Expected count 5, got %d", offsetRange.Count) + } +} + +func TestOffsetSubscription_EmptyRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 10) + + // Create subscription at end + sub, err := subscriber.CreateSubscription("empty-range-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Request range when at end + offsetRange, err := sub.GetOffsetRange(5) + if err != nil { + t.Fatalf("Failed to get offset range at end: %v", err) + } + + if offsetRange.Count != 0 { + t.Errorf("Expected empty range (count 0), got count %d", offsetRange.Count) + } + + if offsetRange.StartOffset != 10 { + t.Errorf("Expected start offset 10, got %d", offsetRange.StartOffset) + } + + if offsetRange.EndOffset != 9 { // Empty range: end < start + t.Errorf("Expected end offset 9 (empty range), got %d", offsetRange.EndOffset) + } +} + +func TestOffsetSeeker_ValidateOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + seeker := NewOffsetSeeker(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets("test-namespace", "test-topic", partition, 15) + + // Test valid range + err := seeker.ValidateOffsetRange("test-namespace", "test-topic", partition, 5, 10) + if err != nil { + t.Errorf("Valid range should not return error: %v", err) + } + + // Test invalid ranges + testCases := []struct { + name string + startOffset int64 + endOffset int64 + expectError bool + }{ + {"negative start", -1, 5, true}, + {"end before start", 10, 5, true}, + {"start beyond hwm", 20, 25, true}, + {"valid range", 0, 14, false}, + {"single offset", 5, 5, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := seeker.ValidateOffsetRange("test-namespace", "test-topic", partition, tc.startOffset, tc.endOffset) + if tc.expectError && err == nil { + t.Error("Expected error but got none") + } + if !tc.expectError && err != nil { + t.Errorf("Expected no error but got: %v", err) + } + }) + } +} + +func TestOffsetSeeker_GetAvailableOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + seeker := NewOffsetSeeker(registry) + partition := createTestPartition() + + // Test empty partition + offsetRange, err := seeker.GetAvailableOffsetRange("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get available range for empty partition: %v", err) + } + + if offsetRange.Count != 0 { + t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count) + } + + // Assign offsets and test again + registry.AssignOffsets("test-namespace", "test-topic", partition, 25) + + offsetRange, err = seeker.GetAvailableOffsetRange("test-namespace", "test-topic", partition) + if err != nil { + t.Fatalf("Failed to get available range: %v", err) + } + + if offsetRange.StartOffset != 0 { + t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset) + } + if offsetRange.EndOffset != 24 { + t.Errorf("Expected end offset 24, got %d", offsetRange.EndOffset) + } + if offsetRange.Count != 25 { + t.Errorf("Expected count 25, got %d", offsetRange.Count) + } +} + +func TestOffsetSubscriber_CloseSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create subscription + sub, err := subscriber.CreateSubscription("close-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Verify subscription exists + _, err = subscriber.GetSubscription("close-test") + if err != nil { + t.Fatalf("Subscription should exist: %v", err) + } + + // Close subscription + err = subscriber.CloseSubscription("close-test") + if err != nil { + t.Fatalf("Failed to close subscription: %v", err) + } + + // Verify subscription is gone + _, err = subscriber.GetSubscription("close-test") + if err == nil { + t.Error("Subscription should not exist after close") + } + + // Verify subscription is marked inactive + if sub.IsActive { + t.Error("Subscription should be marked inactive after close") + } +} + +func TestOffsetSubscription_InactiveOperations(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create and close subscription + sub, err := subscriber.CreateSubscription("inactive-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + subscriber.CloseSubscription("inactive-test") + + // Test operations on inactive subscription + err = sub.SeekToOffset(5) + if err == nil { + t.Error("Expected error for seek on inactive subscription") + } + + _, err = sub.GetLag() + if err == nil { + t.Error("Expected error for GetLag on inactive subscription") + } + + _, err = sub.IsAtEnd() + if err == nil { + t.Error("Expected error for IsAtEnd on inactive subscription") + } + + _, err = sub.GetOffsetRange(10) + if err == nil { + t.Error("Expected error for GetOffsetRange on inactive subscription") + } +} |
