aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset')
-rw-r--r--weed/mq/offset/benchmark_test.go454
-rw-r--r--weed/mq/offset/consumer_group_storage.go181
-rw-r--r--weed/mq/offset/consumer_group_storage_test.go128
-rw-r--r--weed/mq/offset/end_to_end_test.go472
-rw-r--r--weed/mq/offset/filer_storage.go100
-rw-r--r--weed/mq/offset/integration.go380
-rw-r--r--weed/mq/offset/integration_test.go544
-rw-r--r--weed/mq/offset/manager.go343
-rw-r--r--weed/mq/offset/manager_test.go388
-rw-r--r--weed/mq/offset/memory_storage_test.go228
-rw-r--r--weed/mq/offset/migration.go302
-rw-r--r--weed/mq/offset/sql_storage.go394
-rw-r--r--weed/mq/offset/sql_storage_test.go516
-rw-r--r--weed/mq/offset/storage.go5
-rw-r--r--weed/mq/offset/subscriber.go355
-rw-r--r--weed/mq/offset/subscriber_test.go457
16 files changed, 5247 insertions, 0 deletions
diff --git a/weed/mq/offset/benchmark_test.go b/weed/mq/offset/benchmark_test.go
new file mode 100644
index 000000000..d6f33206f
--- /dev/null
+++ b/weed/mq/offset/benchmark_test.go
@@ -0,0 +1,454 @@
+package offset
+
+import (
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ _ "github.com/mattn/go-sqlite3"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// BenchmarkOffsetAssignment benchmarks sequential offset assignment
+func BenchmarkOffsetAssignment(b *testing.B) {
+ storage := NewInMemoryOffsetStorage()
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ b.Fatalf("Failed to create partition manager: %v", err)
+ }
+
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ manager.AssignOffset()
+ }
+ })
+}
+
+// BenchmarkBatchOffsetAssignment benchmarks batch offset assignment
+func BenchmarkBatchOffsetAssignment(b *testing.B) {
+ storage := NewInMemoryOffsetStorage()
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ b.Fatalf("Failed to create partition manager: %v", err)
+ }
+
+ batchSizes := []int64{1, 10, 100, 1000}
+
+ for _, batchSize := range batchSizes {
+ b.Run(fmt.Sprintf("BatchSize%d", batchSize), func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ manager.AssignOffsets(batchSize)
+ }
+ })
+ }
+}
+
+// BenchmarkSQLOffsetStorage benchmarks SQL storage operations
+func BenchmarkSQLOffsetStorage(b *testing.B) {
+ // Create temporary database
+ tmpFile, err := os.CreateTemp("", "benchmark_*.db")
+ if err != nil {
+ b.Fatalf("Failed to create temp database: %v", err)
+ }
+ tmpFile.Close()
+ defer os.Remove(tmpFile.Name())
+
+ db, err := CreateDatabase(tmpFile.Name())
+ if err != nil {
+ b.Fatalf("Failed to create database: %v", err)
+ }
+ defer db.Close()
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ b.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ partitionKey := partitionKey(partition)
+
+ b.Run("SaveCheckpoint", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ storage.SaveCheckpoint("test-namespace", "test-topic", partition, int64(i))
+ }
+ })
+
+ b.Run("LoadCheckpoint", func(b *testing.B) {
+ storage.SaveCheckpoint("test-namespace", "test-topic", partition, 1000)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ storage.LoadCheckpoint("test-namespace", "test-topic", partition)
+ }
+ })
+
+ b.Run("SaveOffsetMapping", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), 100)
+ }
+ })
+
+ // Pre-populate for read benchmarks
+ for i := 0; i < 1000; i++ {
+ storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), 100)
+ }
+
+ b.Run("GetHighestOffset", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ storage.GetHighestOffset("test-namespace", "test-topic", partition)
+ }
+ })
+
+ b.Run("LoadOffsetMappings", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ storage.LoadOffsetMappings(partitionKey)
+ }
+ })
+
+ b.Run("GetOffsetMappingsByRange", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ start := int64(i % 900)
+ end := start + 100
+ storage.GetOffsetMappingsByRange(partitionKey, start, end)
+ }
+ })
+
+ b.Run("GetPartitionStats", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ storage.GetPartitionStats(partitionKey)
+ }
+ })
+}
+
+// BenchmarkInMemoryVsSQL compares in-memory and SQL storage performance
+func BenchmarkInMemoryVsSQL(b *testing.B) {
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ // In-memory storage benchmark
+ b.Run("InMemory", func(b *testing.B) {
+ storage := NewInMemoryOffsetStorage()
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ b.Fatalf("Failed to create partition manager: %v", err)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ manager.AssignOffset()
+ }
+ })
+
+ // SQL storage benchmark
+ b.Run("SQL", func(b *testing.B) {
+ tmpFile, err := os.CreateTemp("", "benchmark_sql_*.db")
+ if err != nil {
+ b.Fatalf("Failed to create temp database: %v", err)
+ }
+ tmpFile.Close()
+ defer os.Remove(tmpFile.Name())
+
+ db, err := CreateDatabase(tmpFile.Name())
+ if err != nil {
+ b.Fatalf("Failed to create database: %v", err)
+ }
+ defer db.Close()
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ b.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ b.Fatalf("Failed to create partition manager: %v", err)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ manager.AssignOffset()
+ }
+ })
+}
+
+// BenchmarkOffsetSubscription benchmarks subscription operations
+func BenchmarkOffsetSubscription(b *testing.B) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ // Pre-assign offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 10000)
+
+ b.Run("CreateSubscription", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ subscriptionID := fmt.Sprintf("bench-sub-%d", i)
+ sub, err := subscriber.CreateSubscription(
+ subscriptionID,
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ b.Fatalf("Failed to create subscription: %v", err)
+ }
+ subscriber.CloseSubscription(subscriptionID)
+ _ = sub
+ }
+ })
+
+ // Create subscription for other benchmarks
+ sub, err := subscriber.CreateSubscription(
+ "bench-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ b.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ b.Run("GetOffsetRange", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ sub.GetOffsetRange(100)
+ }
+ })
+
+ b.Run("AdvanceOffset", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ sub.AdvanceOffset()
+ }
+ })
+
+ b.Run("GetLag", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ sub.GetLag()
+ }
+ })
+
+ b.Run("SeekToOffset", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ offset := int64(i % 9000) // Stay within bounds
+ sub.SeekToOffset(offset)
+ }
+ })
+}
+
+// BenchmarkSMQOffsetIntegration benchmarks the full integration layer
+func BenchmarkSMQOffsetIntegration(b *testing.B) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ b.Run("PublishRecord", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ key := fmt.Sprintf("key-%d", i)
+ integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
+ }
+ })
+
+ b.Run("PublishRecordBatch", func(b *testing.B) {
+ batchSizes := []int{1, 10, 100}
+
+ for _, batchSize := range batchSizes {
+ b.Run(fmt.Sprintf("BatchSize%d", batchSize), func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ records := make([]PublishRecordRequest, batchSize)
+ for j := 0; j < batchSize; j++ {
+ records[j] = PublishRecordRequest{
+ Key: []byte(fmt.Sprintf("batch-%d-key-%d", i, j)),
+ Value: &schema_pb.RecordValue{},
+ }
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+ }
+ })
+ }
+ })
+
+ // Pre-populate for subscription benchmarks
+ records := make([]PublishRecordRequest, 1000)
+ for i := 0; i < 1000; i++ {
+ records[i] = PublishRecordRequest{
+ Key: []byte(fmt.Sprintf("pre-key-%d", i)),
+ Value: &schema_pb.RecordValue{},
+ }
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ b.Run("CreateSubscription", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ subscriptionID := fmt.Sprintf("integration-sub-%d", i)
+ sub, err := integration.CreateSubscription(
+ subscriptionID,
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ b.Fatalf("Failed to create subscription: %v", err)
+ }
+ integration.CloseSubscription(subscriptionID)
+ _ = sub
+ }
+ })
+
+ b.Run("GetHighWaterMark", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ }
+ })
+
+ b.Run("GetPartitionOffsetInfo", func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
+ }
+ })
+}
+
+// BenchmarkConcurrentOperations benchmarks concurrent offset operations
+func BenchmarkConcurrentOperations(b *testing.B) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ b.Run("ConcurrentPublish", func(b *testing.B) {
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ i := 0
+ for pb.Next() {
+ key := fmt.Sprintf("concurrent-key-%d", i)
+ integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
+ i++
+ }
+ })
+ })
+
+ // Pre-populate for concurrent reads
+ for i := 0; i < 1000; i++ {
+ key := fmt.Sprintf("read-key-%d", i)
+ integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
+ }
+
+ b.Run("ConcurrentRead", func(b *testing.B) {
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ }
+ })
+ })
+
+ b.Run("ConcurrentMixed", func(b *testing.B) {
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ i := 0
+ for pb.Next() {
+ if i%10 == 0 {
+ // 10% writes
+ key := fmt.Sprintf("mixed-key-%d", i)
+ integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
+ } else {
+ // 90% reads
+ integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ }
+ i++
+ }
+ })
+ })
+}
+
+// BenchmarkMemoryUsage benchmarks memory usage patterns
+func BenchmarkMemoryUsage(b *testing.B) {
+ b.Run("InMemoryStorage", func(b *testing.B) {
+ storage := NewInMemoryOffsetStorage()
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ b.Fatalf("Failed to create partition manager: %v", err)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ manager.AssignOffset()
+ if i%1000 == 0 {
+ // Periodic checkpoint to simulate real usage
+ manager.checkpoint(int64(i))
+ }
+ }
+ })
+}
diff --git a/weed/mq/offset/consumer_group_storage.go b/weed/mq/offset/consumer_group_storage.go
new file mode 100644
index 000000000..74c2db908
--- /dev/null
+++ b/weed/mq/offset/consumer_group_storage.go
@@ -0,0 +1,181 @@
+package offset
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// ConsumerGroupPosition represents a consumer's position in a partition
+// This can be either a timestamp or an offset
+type ConsumerGroupPosition struct {
+ Type string `json:"type"` // "offset" or "timestamp"
+ Value int64 `json:"value"` // The actual offset or timestamp value
+ OffsetType string `json:"offset_type"` // Optional: OffsetType enum name (e.g., "EXACT_OFFSET")
+ CommittedAt int64 `json:"committed_at"` // Unix timestamp in milliseconds when committed
+ Metadata string `json:"metadata"` // Optional: application-specific metadata
+}
+
+// ConsumerGroupOffsetStorage handles consumer group offset persistence
+// Each consumer group gets its own offset file in a dedicated consumers/ subfolder:
+// Path: /topics/{namespace}/{topic}/{version}/{partition}/consumers/{consumer_group}.offset
+type ConsumerGroupOffsetStorage interface {
+ // SaveConsumerGroupOffset saves the committed offset for a consumer group
+ SaveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error
+
+ // SaveConsumerGroupPosition saves the committed position (offset or timestamp) for a consumer group
+ SaveConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string, position *ConsumerGroupPosition) error
+
+ // LoadConsumerGroupOffset loads the committed offset for a consumer group (backward compatible)
+ LoadConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) (int64, error)
+
+ // LoadConsumerGroupPosition loads the committed position for a consumer group
+ LoadConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string) (*ConsumerGroupPosition, error)
+
+ // ListConsumerGroups returns all consumer groups for a topic partition
+ ListConsumerGroups(t topic.Topic, p topic.Partition) ([]string, error)
+
+ // DeleteConsumerGroupOffset removes the offset file for a consumer group
+ DeleteConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) error
+}
+
+// FilerConsumerGroupOffsetStorage implements ConsumerGroupOffsetStorage using SeaweedFS filer
+type FilerConsumerGroupOffsetStorage struct {
+ filerClientAccessor *filer_client.FilerClientAccessor
+}
+
+// NewFilerConsumerGroupOffsetStorageWithAccessor creates storage using a shared filer client accessor
+func NewFilerConsumerGroupOffsetStorageWithAccessor(filerClientAccessor *filer_client.FilerClientAccessor) *FilerConsumerGroupOffsetStorage {
+ return &FilerConsumerGroupOffsetStorage{
+ filerClientAccessor: filerClientAccessor,
+ }
+}
+
+// SaveConsumerGroupOffset saves the committed offset for a consumer group
+// Stores as: /topics/{namespace}/{topic}/{version}/{partition}/consumers/{consumer_group}.offset
+// This is a convenience method that wraps SaveConsumerGroupPosition
+func (f *FilerConsumerGroupOffsetStorage) SaveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error {
+ position := &ConsumerGroupPosition{
+ Type: "offset",
+ Value: offset,
+ OffsetType: schema_pb.OffsetType_EXACT_OFFSET.String(),
+ CommittedAt: time.Now().UnixMilli(),
+ }
+ return f.SaveConsumerGroupPosition(t, p, consumerGroup, position)
+}
+
+// SaveConsumerGroupPosition saves the committed position (offset or timestamp) for a consumer group
+// Stores as JSON: /topics/{namespace}/{topic}/{version}/{partition}/consumers/{consumer_group}.offset
+func (f *FilerConsumerGroupOffsetStorage) SaveConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string, position *ConsumerGroupPosition) error {
+ partitionDir := topic.PartitionDir(t, p)
+ consumersDir := fmt.Sprintf("%s/consumers", partitionDir)
+ offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
+
+ // Marshal position to JSON
+ jsonBytes, err := json.Marshal(position)
+ if err != nil {
+ return fmt.Errorf("failed to marshal position to JSON: %w", err)
+ }
+
+ return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, consumersDir, offsetFileName, jsonBytes)
+ })
+}
+
+// LoadConsumerGroupOffset loads the committed offset for a consumer group
+// This method provides backward compatibility and returns just the offset value
+func (f *FilerConsumerGroupOffsetStorage) LoadConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) (int64, error) {
+ position, err := f.LoadConsumerGroupPosition(t, p, consumerGroup)
+ if err != nil {
+ return -1, err
+ }
+ return position.Value, nil
+}
+
+// LoadConsumerGroupPosition loads the committed position for a consumer group
+func (f *FilerConsumerGroupOffsetStorage) LoadConsumerGroupPosition(t topic.Topic, p topic.Partition, consumerGroup string) (*ConsumerGroupPosition, error) {
+ partitionDir := topic.PartitionDir(t, p)
+ consumersDir := fmt.Sprintf("%s/consumers", partitionDir)
+ offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
+
+ var position *ConsumerGroupPosition
+ err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ data, err := filer.ReadInsideFiler(client, consumersDir, offsetFileName)
+ if err != nil {
+ return err
+ }
+
+ // Parse JSON format
+ position = &ConsumerGroupPosition{}
+ if err := json.Unmarshal(data, position); err != nil {
+ return fmt.Errorf("invalid consumer group offset file format: %w", err)
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return position, nil
+}
+
+// ListConsumerGroups returns all consumer groups for a topic partition
+func (f *FilerConsumerGroupOffsetStorage) ListConsumerGroups(t topic.Topic, p topic.Partition) ([]string, error) {
+ partitionDir := topic.PartitionDir(t, p)
+ consumersDir := fmt.Sprintf("%s/consumers", partitionDir)
+ var consumerGroups []string
+
+ err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Use ListEntries to get directory contents
+ stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
+ Directory: consumersDir,
+ })
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return err
+ }
+
+ entry := resp.Entry
+ if entry != nil && !entry.IsDirectory && entry.Name != "" {
+ // Check if this is a consumer group offset file (ends with .offset)
+ if len(entry.Name) > 7 && entry.Name[len(entry.Name)-7:] == ".offset" {
+ // Extract consumer group name (remove .offset suffix)
+ consumerGroup := entry.Name[:len(entry.Name)-7]
+ consumerGroups = append(consumerGroups, consumerGroup)
+ }
+ }
+ }
+ return nil
+ })
+
+ return consumerGroups, err
+}
+
+// DeleteConsumerGroupOffset removes the offset file for a consumer group
+func (f *FilerConsumerGroupOffsetStorage) DeleteConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string) error {
+ partitionDir := topic.PartitionDir(t, p)
+ consumersDir := fmt.Sprintf("%s/consumers", partitionDir)
+ offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
+
+ return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer_pb.DoRemove(context.Background(), client, consumersDir, offsetFileName, false, false, false, false, nil)
+ })
+}
diff --git a/weed/mq/offset/consumer_group_storage_test.go b/weed/mq/offset/consumer_group_storage_test.go
new file mode 100644
index 000000000..ff1163e93
--- /dev/null
+++ b/weed/mq/offset/consumer_group_storage_test.go
@@ -0,0 +1,128 @@
+package offset
+
+import (
+ "encoding/json"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func TestConsumerGroupPosition_JSON(t *testing.T) {
+ tests := []struct {
+ name string
+ position *ConsumerGroupPosition
+ }{
+ {
+ name: "offset-based position",
+ position: &ConsumerGroupPosition{
+ Type: "offset",
+ Value: 12345,
+ OffsetType: schema_pb.OffsetType_EXACT_OFFSET.String(),
+ CommittedAt: time.Now().UnixMilli(),
+ Metadata: "test metadata",
+ },
+ },
+ {
+ name: "timestamp-based position",
+ position: &ConsumerGroupPosition{
+ Type: "timestamp",
+ Value: time.Now().UnixNano(),
+ OffsetType: schema_pb.OffsetType_EXACT_TS_NS.String(),
+ CommittedAt: time.Now().UnixMilli(),
+ Metadata: "checkpoint at 2024-10-05",
+ },
+ },
+ {
+ name: "minimal position",
+ position: &ConsumerGroupPosition{
+ Type: "offset",
+ Value: 42,
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Marshal to JSON
+ jsonBytes, err := json.Marshal(tt.position)
+ if err != nil {
+ t.Fatalf("Failed to marshal: %v", err)
+ }
+
+ t.Logf("JSON: %s", string(jsonBytes))
+
+ // Unmarshal from JSON
+ var decoded ConsumerGroupPosition
+ if err := json.Unmarshal(jsonBytes, &decoded); err != nil {
+ t.Fatalf("Failed to unmarshal: %v", err)
+ }
+
+ // Verify fields
+ if decoded.Type != tt.position.Type {
+ t.Errorf("Type mismatch: got %s, want %s", decoded.Type, tt.position.Type)
+ }
+ if decoded.Value != tt.position.Value {
+ t.Errorf("Value mismatch: got %d, want %d", decoded.Value, tt.position.Value)
+ }
+ if decoded.OffsetType != tt.position.OffsetType {
+ t.Errorf("OffsetType mismatch: got %s, want %s", decoded.OffsetType, tt.position.OffsetType)
+ }
+ if decoded.Metadata != tt.position.Metadata {
+ t.Errorf("Metadata mismatch: got %s, want %s", decoded.Metadata, tt.position.Metadata)
+ }
+ })
+ }
+}
+
+func TestConsumerGroupPosition_JSONExamples(t *testing.T) {
+ // Test JSON format examples
+ jsonExamples := []string{
+ `{"type":"offset","value":12345}`,
+ `{"type":"timestamp","value":1696521600000000000}`,
+ `{"type":"offset","value":42,"offset_type":"EXACT_OFFSET","committed_at":1696521600000,"metadata":"test"}`,
+ }
+
+ for i, jsonStr := range jsonExamples {
+ var position ConsumerGroupPosition
+ if err := json.Unmarshal([]byte(jsonStr), &position); err != nil {
+ t.Errorf("Example %d: Failed to parse JSON: %v", i, err)
+ continue
+ }
+
+ t.Logf("Example %d: Type=%s, Value=%d", i, position.Type, position.Value)
+
+ // Verify required fields
+ if position.Type == "" {
+ t.Errorf("Example %d: Type is empty", i)
+ }
+ if position.Value == 0 {
+ t.Errorf("Example %d: Value is zero", i)
+ }
+ }
+}
+
+func TestConsumerGroupPosition_TypeValidation(t *testing.T) {
+ validTypes := []string{"offset", "timestamp"}
+
+ for _, typ := range validTypes {
+ position := &ConsumerGroupPosition{
+ Type: typ,
+ Value: 100,
+ }
+
+ jsonBytes, err := json.Marshal(position)
+ if err != nil {
+ t.Fatalf("Failed to marshal position with type '%s': %v", typ, err)
+ }
+
+ var decoded ConsumerGroupPosition
+ if err := json.Unmarshal(jsonBytes, &decoded); err != nil {
+ t.Fatalf("Failed to unmarshal position with type '%s': %v", typ, err)
+ }
+
+ if decoded.Type != typ {
+ t.Errorf("Type mismatch: got '%s', want '%s'", decoded.Type, typ)
+ }
+ }
+}
diff --git a/weed/mq/offset/end_to_end_test.go b/weed/mq/offset/end_to_end_test.go
new file mode 100644
index 000000000..a4db891e1
--- /dev/null
+++ b/weed/mq/offset/end_to_end_test.go
@@ -0,0 +1,472 @@
+package offset
+
+import (
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ _ "github.com/mattn/go-sqlite3"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// TestEndToEndOffsetFlow tests the complete offset management flow
+func TestEndToEndOffsetFlow(t *testing.T) {
+ // Create temporary database
+ tmpFile, err := os.CreateTemp("", "e2e_offset_test_*.db")
+ if err != nil {
+ t.Fatalf("Failed to create temp database: %v", err)
+ }
+ tmpFile.Close()
+ defer os.Remove(tmpFile.Name())
+
+ // Create database with migrations
+ db, err := CreateDatabase(tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to create database: %v", err)
+ }
+ defer db.Close()
+
+ // Create SQL storage
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ // Create SMQ offset integration
+ integration := NewSMQOffsetIntegration(storage)
+
+ // Test partition
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ t.Run("PublishAndAssignOffsets", func(t *testing.T) {
+ // Simulate publishing messages with offset assignment
+ records := []PublishRecordRequest{
+ {Key: []byte("user1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("user2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("user3"), Value: &schema_pb.RecordValue{}},
+ }
+
+ response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+ if err != nil {
+ t.Fatalf("Failed to publish record batch: %v", err)
+ }
+
+ if response.BaseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
+ }
+
+ if response.LastOffset != 2 {
+ t.Errorf("Expected last offset 2, got %d", response.LastOffset)
+ }
+
+ // Verify high water mark
+ hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark: %v", err)
+ }
+
+ if hwm != 3 {
+ t.Errorf("Expected high water mark 3, got %d", hwm)
+ }
+ })
+
+ t.Run("CreateAndUseSubscription", func(t *testing.T) {
+ // Create subscription from earliest
+ sub, err := integration.CreateSubscription(
+ "e2e-test-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Subscribe to records
+ responses, err := integration.SubscribeRecords(sub, 2)
+ if err != nil {
+ t.Fatalf("Failed to subscribe to records: %v", err)
+ }
+
+ if len(responses) != 2 {
+ t.Errorf("Expected 2 responses, got %d", len(responses))
+ }
+
+ // Check subscription advancement
+ if sub.CurrentOffset != 2 {
+ t.Errorf("Expected current offset 2, got %d", sub.CurrentOffset)
+ }
+
+ // Get subscription lag
+ lag, err := sub.GetLag()
+ if err != nil {
+ t.Fatalf("Failed to get lag: %v", err)
+ }
+
+ if lag != 1 { // 3 (hwm) - 2 (current) = 1
+ t.Errorf("Expected lag 1, got %d", lag)
+ }
+ })
+
+ t.Run("OffsetSeekingAndRanges", func(t *testing.T) {
+ // Create subscription at specific offset
+ sub, err := integration.CreateSubscription(
+ "seek-test-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_EXACT_OFFSET,
+ 1,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription at offset 1: %v", err)
+ }
+
+ // Verify starting position
+ if sub.CurrentOffset != 1 {
+ t.Errorf("Expected current offset 1, got %d", sub.CurrentOffset)
+ }
+
+ // Get offset range
+ offsetRange, err := sub.GetOffsetRange(2)
+ if err != nil {
+ t.Fatalf("Failed to get offset range: %v", err)
+ }
+
+ if offsetRange.StartOffset != 1 {
+ t.Errorf("Expected start offset 1, got %d", offsetRange.StartOffset)
+ }
+
+ if offsetRange.Count != 2 {
+ t.Errorf("Expected count 2, got %d", offsetRange.Count)
+ }
+
+ // Seek to different offset
+ err = sub.SeekToOffset(0)
+ if err != nil {
+ t.Fatalf("Failed to seek to offset 0: %v", err)
+ }
+
+ if sub.CurrentOffset != 0 {
+ t.Errorf("Expected current offset 0 after seek, got %d", sub.CurrentOffset)
+ }
+ })
+
+ t.Run("PartitionInformationAndMetrics", func(t *testing.T) {
+ // Get partition offset info
+ info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get partition offset info: %v", err)
+ }
+
+ if info.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
+ }
+
+ if info.LatestOffset != 2 {
+ t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
+ }
+
+ if info.HighWaterMark != 3 {
+ t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
+ }
+
+ if info.ActiveSubscriptions != 2 { // Two subscriptions created above
+ t.Errorf("Expected 2 active subscriptions, got %d", info.ActiveSubscriptions)
+ }
+
+ // Get offset metrics
+ metrics := integration.GetOffsetMetrics()
+ if metrics.PartitionCount != 1 {
+ t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
+ }
+
+ if metrics.ActiveSubscriptions != 2 {
+ t.Errorf("Expected 2 active subscriptions in metrics, got %d", metrics.ActiveSubscriptions)
+ }
+ })
+}
+
+// TestOffsetPersistenceAcrossRestarts tests that offsets persist across system restarts
+func TestOffsetPersistenceAcrossRestarts(t *testing.T) {
+ // Create temporary database
+ tmpFile, err := os.CreateTemp("", "persistence_test_*.db")
+ if err != nil {
+ t.Fatalf("Failed to create temp database: %v", err)
+ }
+ tmpFile.Close()
+ defer os.Remove(tmpFile.Name())
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ var lastOffset int64
+
+ // First session: Create database and assign offsets
+ {
+ db, err := CreateDatabase(tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to create database: %v", err)
+ }
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+
+ integration := NewSMQOffsetIntegration(storage)
+
+ // Publish some records
+ records := []PublishRecordRequest{
+ {Key: []byte("msg1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("msg2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("msg3"), Value: &schema_pb.RecordValue{}},
+ }
+
+ response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+ if err != nil {
+ t.Fatalf("Failed to publish records: %v", err)
+ }
+
+ lastOffset = response.LastOffset
+
+ // Close connections
+ storage.Close()
+ db.Close()
+ }
+
+ // Second session: Reopen database and verify persistence
+ {
+ db, err := CreateDatabase(tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to reopen database: %v", err)
+ }
+ defer db.Close()
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ integration := NewSMQOffsetIntegration(storage)
+
+ // Verify high water mark persisted
+ hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark after restart: %v", err)
+ }
+
+ if hwm != lastOffset+1 {
+ t.Errorf("Expected high water mark %d after restart, got %d", lastOffset+1, hwm)
+ }
+
+ // Assign new offsets and verify continuity
+ newResponse, err := integration.PublishRecord("test-namespace", "test-topic", partition, []byte("msg4"), &schema_pb.RecordValue{})
+ if err != nil {
+ t.Fatalf("Failed to publish new record after restart: %v", err)
+ }
+
+ expectedNextOffset := lastOffset + 1
+ if newResponse.BaseOffset != expectedNextOffset {
+ t.Errorf("Expected next offset %d after restart, got %d", expectedNextOffset, newResponse.BaseOffset)
+ }
+ }
+}
+
+// TestConcurrentOffsetOperations tests concurrent offset operations
+func TestConcurrentOffsetOperations(t *testing.T) {
+ // Create temporary database
+ tmpFile, err := os.CreateTemp("", "concurrent_test_*.db")
+ if err != nil {
+ t.Fatalf("Failed to create temp database: %v", err)
+ }
+ tmpFile.Close()
+ defer os.Remove(tmpFile.Name())
+
+ db, err := CreateDatabase(tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to create database: %v", err)
+ }
+ defer db.Close()
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ integration := NewSMQOffsetIntegration(storage)
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ // Concurrent publishers
+ const numPublishers = 5
+ const recordsPerPublisher = 10
+
+ done := make(chan bool, numPublishers)
+
+ for i := 0; i < numPublishers; i++ {
+ go func(publisherID int) {
+ defer func() { done <- true }()
+
+ for j := 0; j < recordsPerPublisher; j++ {
+ key := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j)
+ _, err := integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
+ if err != nil {
+ t.Errorf("Publisher %d failed to publish message %d: %v", publisherID, j, err)
+ return
+ }
+ }
+ }(i)
+ }
+
+ // Wait for all publishers to complete
+ for i := 0; i < numPublishers; i++ {
+ <-done
+ }
+
+ // Verify total records
+ hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark: %v", err)
+ }
+
+ expectedTotal := int64(numPublishers * recordsPerPublisher)
+ if hwm != expectedTotal {
+ t.Errorf("Expected high water mark %d, got %d", expectedTotal, hwm)
+ }
+
+ // Verify no duplicate offsets
+ info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get partition info: %v", err)
+ }
+
+ if info.RecordCount != expectedTotal {
+ t.Errorf("Expected record count %d, got %d", expectedTotal, info.RecordCount)
+ }
+}
+
+// TestOffsetValidationAndErrorHandling tests error conditions and validation
+func TestOffsetValidationAndErrorHandling(t *testing.T) {
+ // Create temporary database
+ tmpFile, err := os.CreateTemp("", "validation_test_*.db")
+ if err != nil {
+ t.Fatalf("Failed to create temp database: %v", err)
+ }
+ tmpFile.Close()
+ defer os.Remove(tmpFile.Name())
+
+ db, err := CreateDatabase(tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to create database: %v", err)
+ }
+ defer db.Close()
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ integration := NewSMQOffsetIntegration(storage)
+
+ partition := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ t.Run("InvalidOffsetSubscription", func(t *testing.T) {
+ // Try to create subscription with invalid offset
+ _, err := integration.CreateSubscription(
+ "invalid-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_EXACT_OFFSET,
+ 100, // Beyond any existing data
+ )
+ if err == nil {
+ t.Error("Expected error for subscription beyond high water mark")
+ }
+ })
+
+ t.Run("NegativeOffsetValidation", func(t *testing.T) {
+ // Try to create subscription with negative offset
+ _, err := integration.CreateSubscription(
+ "negative-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_EXACT_OFFSET,
+ -1,
+ )
+ if err == nil {
+ t.Error("Expected error for negative offset")
+ }
+ })
+
+ t.Run("DuplicateSubscriptionID", func(t *testing.T) {
+ // Create first subscription
+ _, err := integration.CreateSubscription(
+ "duplicate-id",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create first subscription: %v", err)
+ }
+
+ // Try to create duplicate
+ _, err = integration.CreateSubscription(
+ "duplicate-id",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err == nil {
+ t.Error("Expected error for duplicate subscription ID")
+ }
+ })
+
+ t.Run("OffsetRangeValidation", func(t *testing.T) {
+ // Add some data first
+ integration.PublishRecord("test-namespace", "test-topic", partition, []byte("test"), &schema_pb.RecordValue{})
+
+ // Test invalid range validation
+ err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 5, 10) // Beyond high water mark
+ if err == nil {
+ t.Error("Expected error for range beyond high water mark")
+ }
+
+ err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 10, 5) // End before start
+ if err == nil {
+ t.Error("Expected error for end offset before start offset")
+ }
+
+ err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, -1, 5) // Negative start
+ if err == nil {
+ t.Error("Expected error for negative start offset")
+ }
+ })
+}
diff --git a/weed/mq/offset/filer_storage.go b/weed/mq/offset/filer_storage.go
new file mode 100644
index 000000000..81be78470
--- /dev/null
+++ b/weed/mq/offset/filer_storage.go
@@ -0,0 +1,100 @@
+package offset
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// FilerOffsetStorage implements OffsetStorage using SeaweedFS filer
+// Stores offset data as files in the same directory structure as SMQ
+// Path: /topics/{namespace}/{topic}/{version}/{partition}/checkpoint.offset
+// The namespace and topic are derived from the actual partition information
+type FilerOffsetStorage struct {
+ filerClientAccessor *filer_client.FilerClientAccessor
+}
+
+// NewFilerOffsetStorageWithAccessor creates a new filer-based offset storage using existing filer client accessor
+func NewFilerOffsetStorageWithAccessor(filerClientAccessor *filer_client.FilerClientAccessor) *FilerOffsetStorage {
+ return &FilerOffsetStorage{
+ filerClientAccessor: filerClientAccessor,
+ }
+}
+
+// SaveCheckpoint saves the checkpoint for a partition
+// Stores as: /topics/{namespace}/{topic}/{version}/{partition}/checkpoint.offset
+func (f *FilerOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error {
+ partitionDir := f.getPartitionDir(namespace, topicName, partition)
+ fileName := "checkpoint.offset"
+
+ // Use SMQ's 8-byte offset format
+ offsetBytes := make([]byte, 8)
+ util.Uint64toBytes(offsetBytes, uint64(offset))
+
+ return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, partitionDir, fileName, offsetBytes)
+ })
+}
+
+// LoadCheckpoint loads the checkpoint for a partition
+func (f *FilerOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ partitionDir := f.getPartitionDir(namespace, topicName, partition)
+ fileName := "checkpoint.offset"
+
+ var offset int64 = -1
+ err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ data, err := filer.ReadInsideFiler(client, partitionDir, fileName)
+ if err != nil {
+ return err
+ }
+ if len(data) != 8 {
+ return fmt.Errorf("invalid checkpoint file format: expected 8 bytes, got %d", len(data))
+ }
+ offset = int64(util.BytesToUint64(data))
+ return nil
+ })
+
+ if err != nil {
+ return -1, err
+ }
+
+ return offset, nil
+}
+
+// GetHighestOffset returns the highest offset stored for a partition
+// For filer storage, this is the same as the checkpoint since we don't store individual records
+func (f *FilerOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ return f.LoadCheckpoint(namespace, topicName, partition)
+}
+
+// Reset clears all data for testing
+func (f *FilerOffsetStorage) Reset() error {
+ // For testing, we could delete all offset files, but this is dangerous
+ // Instead, just return success - individual tests should clean up their own data
+ return nil
+}
+
+// Helper methods
+
+// getPartitionDir returns the directory path for a partition following SMQ convention
+// Format: /topics/{namespace}/{topic}/{version}/{partition}
+func (f *FilerOffsetStorage) getPartitionDir(namespace, topicName string, partition *schema_pb.Partition) string {
+ // Generate version from UnixTimeNs
+ version := time.Unix(0, partition.UnixTimeNs).UTC().Format("v2006-01-02-15-04-05")
+
+ // Generate partition range string
+ partitionRange := fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
+
+ return fmt.Sprintf("%s/%s/%s/%s/%s", filer.TopicsDir, namespace, topicName, version, partitionRange)
+}
+
+// getPartitionKey generates a unique key for a partition
+func (f *FilerOffsetStorage) getPartitionKey(partition *schema_pb.Partition) string {
+ return fmt.Sprintf("ring:%d:range:%d-%d:time:%d",
+ partition.RingSize, partition.RangeStart, partition.RangeStop, partition.UnixTimeNs)
+}
diff --git a/weed/mq/offset/integration.go b/weed/mq/offset/integration.go
new file mode 100644
index 000000000..4b9ee6183
--- /dev/null
+++ b/weed/mq/offset/integration.go
@@ -0,0 +1,380 @@
+package offset
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// SMQOffsetIntegration provides integration between offset management and SMQ broker
+type SMQOffsetIntegration struct {
+ mu sync.RWMutex
+ offsetAssigner *OffsetAssigner
+ offsetSubscriber *OffsetSubscriber
+ offsetSeeker *OffsetSeeker
+}
+
+// NewSMQOffsetIntegration creates a new SMQ offset integration
+func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration {
+ registry := NewPartitionOffsetRegistry(storage)
+ assigner := &OffsetAssigner{registry: registry}
+
+ return &SMQOffsetIntegration{
+ offsetAssigner: assigner,
+ offsetSubscriber: NewOffsetSubscriber(registry),
+ offsetSeeker: NewOffsetSeeker(registry),
+ }
+}
+
+// PublishRecord publishes a record and assigns it an offset
+func (integration *SMQOffsetIntegration) PublishRecord(
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ key []byte,
+ value *schema_pb.RecordValue,
+) (*mq_agent_pb.PublishRecordResponse, error) {
+
+ // Assign offset for this record
+ result := integration.offsetAssigner.AssignSingleOffset(namespace, topicName, partition)
+ if result.Error != nil {
+ return &mq_agent_pb.PublishRecordResponse{
+ Error: fmt.Sprintf("Failed to assign offset: %v", result.Error),
+ }, nil
+ }
+
+ assignment := result.Assignment
+
+ // Note: Removed in-memory mapping storage to prevent memory leaks
+ // Record-to-offset mappings are now handled by persistent storage layer
+
+ // Return response with offset information
+ return &mq_agent_pb.PublishRecordResponse{
+ AckSequence: assignment.Offset, // Use offset as ack sequence for now
+ BaseOffset: assignment.Offset,
+ LastOffset: assignment.Offset,
+ Error: "",
+ }, nil
+}
+
+// PublishRecordBatch publishes a batch of records and assigns them offsets
+func (integration *SMQOffsetIntegration) PublishRecordBatch(
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ records []PublishRecordRequest,
+) (*mq_agent_pb.PublishRecordResponse, error) {
+
+ if len(records) == 0 {
+ return &mq_agent_pb.PublishRecordResponse{
+ Error: "Empty record batch",
+ }, nil
+ }
+
+ // Assign batch of offsets
+ result := integration.offsetAssigner.AssignBatchOffsets(namespace, topicName, partition, int64(len(records)))
+ if result.Error != nil {
+ return &mq_agent_pb.PublishRecordResponse{
+ Error: fmt.Sprintf("Failed to assign batch offsets: %v", result.Error),
+ }, nil
+ }
+
+ batch := result.Batch
+
+ // Note: Removed in-memory mapping storage to prevent memory leaks
+ // Batch record-to-offset mappings are now handled by persistent storage layer
+
+ return &mq_agent_pb.PublishRecordResponse{
+ AckSequence: batch.LastOffset, // Use last offset as ack sequence
+ BaseOffset: batch.BaseOffset,
+ LastOffset: batch.LastOffset,
+ Error: "",
+ }, nil
+}
+
+// CreateSubscription creates an offset-based subscription
+func (integration *SMQOffsetIntegration) CreateSubscription(
+ subscriptionID string,
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ offsetType schema_pb.OffsetType,
+ startOffset int64,
+) (*OffsetSubscription, error) {
+
+ return integration.offsetSubscriber.CreateSubscription(
+ subscriptionID,
+ namespace, topicName,
+ partition,
+ offsetType,
+ startOffset,
+ )
+}
+
+// SubscribeRecords subscribes to records starting from a specific offset
+func (integration *SMQOffsetIntegration) SubscribeRecords(
+ subscription *OffsetSubscription,
+ maxRecords int64,
+) ([]*mq_agent_pb.SubscribeRecordResponse, error) {
+
+ if !subscription.IsActive {
+ return nil, fmt.Errorf("subscription is not active")
+ }
+
+ // Get the range of offsets to read
+ offsetRange, err := subscription.GetOffsetRange(maxRecords)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get offset range: %w", err)
+ }
+
+ if offsetRange.Count == 0 {
+ // No records available
+ return []*mq_agent_pb.SubscribeRecordResponse{}, nil
+ }
+
+ // TODO: This is where we would integrate with SMQ's actual storage layer
+ // For now, return mock responses with offset information
+ responses := make([]*mq_agent_pb.SubscribeRecordResponse, offsetRange.Count)
+
+ for i := int64(0); i < offsetRange.Count; i++ {
+ offset := offsetRange.StartOffset + i
+
+ responses[i] = &mq_agent_pb.SubscribeRecordResponse{
+ Key: []byte(fmt.Sprintf("key-%d", offset)),
+ Value: &schema_pb.RecordValue{}, // Mock value
+ TsNs: offset * 1000000, // Mock timestamp based on offset
+ Offset: offset,
+ IsEndOfStream: false,
+ IsEndOfTopic: false,
+ Error: "",
+ }
+ }
+
+ // Advance the subscription
+ subscription.AdvanceOffsetBy(offsetRange.Count)
+
+ return responses, nil
+}
+
+// GetHighWaterMark returns the high water mark for a partition
+func (integration *SMQOffsetIntegration) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ return integration.offsetAssigner.GetHighWaterMark(namespace, topicName, partition)
+}
+
+// SeekSubscription seeks a subscription to a specific offset
+func (integration *SMQOffsetIntegration) SeekSubscription(
+ subscriptionID string,
+ offset int64,
+) error {
+
+ subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID)
+ if err != nil {
+ return fmt.Errorf("subscription not found: %w", err)
+ }
+
+ return subscription.SeekToOffset(offset)
+}
+
+// GetSubscriptionLag returns the lag for a subscription
+func (integration *SMQOffsetIntegration) GetSubscriptionLag(subscriptionID string) (int64, error) {
+ subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID)
+ if err != nil {
+ return 0, fmt.Errorf("subscription not found: %w", err)
+ }
+
+ return subscription.GetLag()
+}
+
+// CloseSubscription closes a subscription
+func (integration *SMQOffsetIntegration) CloseSubscription(subscriptionID string) error {
+ return integration.offsetSubscriber.CloseSubscription(subscriptionID)
+}
+
+// ValidateOffsetRange validates an offset range for a partition
+func (integration *SMQOffsetIntegration) ValidateOffsetRange(
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ startOffset, endOffset int64,
+) error {
+
+ return integration.offsetSeeker.ValidateOffsetRange(namespace, topicName, partition, startOffset, endOffset)
+}
+
+// GetAvailableOffsetRange returns the available offset range for a partition
+func (integration *SMQOffsetIntegration) GetAvailableOffsetRange(namespace, topicName string, partition *schema_pb.Partition) (*OffsetRange, error) {
+ return integration.offsetSeeker.GetAvailableOffsetRange(namespace, topicName, partition)
+}
+
+// PublishRecordRequest represents a record to be published
+type PublishRecordRequest struct {
+ Key []byte
+ Value *schema_pb.RecordValue
+}
+
+// OffsetMetrics provides metrics about offset usage
+type OffsetMetrics struct {
+ PartitionCount int64
+ TotalOffsets int64
+ ActiveSubscriptions int64
+ AverageLatency float64
+}
+
+// GetOffsetMetrics returns metrics about offset usage
+func (integration *SMQOffsetIntegration) GetOffsetMetrics() *OffsetMetrics {
+ integration.mu.RLock()
+ defer integration.mu.RUnlock()
+
+ // Count active subscriptions
+ activeSubscriptions := int64(0)
+ for _, subscription := range integration.offsetSubscriber.subscriptions {
+ if subscription.IsActive {
+ activeSubscriptions++
+ }
+ }
+
+ // Calculate total offsets from all partition managers instead of in-memory map
+ var totalOffsets int64
+ for _, manager := range integration.offsetAssigner.registry.managers {
+ totalOffsets += manager.GetHighWaterMark()
+ }
+
+ return &OffsetMetrics{
+ PartitionCount: int64(len(integration.offsetAssigner.registry.managers)),
+ TotalOffsets: totalOffsets, // Now calculated from storage, not memory maps
+ ActiveSubscriptions: activeSubscriptions,
+ AverageLatency: 0.0, // TODO: Implement latency tracking
+ }
+}
+
+// OffsetInfo provides detailed information about an offset
+type OffsetInfo struct {
+ Offset int64
+ Timestamp int64
+ Partition *schema_pb.Partition
+ Exists bool
+}
+
+// GetOffsetInfo returns detailed information about a specific offset
+func (integration *SMQOffsetIntegration) GetOffsetInfo(
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ offset int64,
+) (*OffsetInfo, error) {
+
+ hwm, err := integration.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ exists := offset >= 0 && offset < hwm
+
+ // TODO: Get actual timestamp from storage
+ timestamp := int64(0)
+ // Note: Timestamp lookup from in-memory map removed to prevent memory leaks
+ // For now, use a placeholder timestamp. In production, this should come from
+ // persistent storage if timestamp tracking is needed.
+ if exists {
+ timestamp = time.Now().UnixNano() // Placeholder - should come from storage
+ }
+
+ return &OffsetInfo{
+ Offset: offset,
+ Timestamp: timestamp,
+ Partition: partition,
+ Exists: exists,
+ }, nil
+}
+
+// PartitionOffsetInfo provides offset information for a partition
+type PartitionOffsetInfo struct {
+ Partition *schema_pb.Partition
+ EarliestOffset int64
+ LatestOffset int64
+ HighWaterMark int64
+ RecordCount int64
+ ActiveSubscriptions int64
+}
+
+// GetPartitionOffsetInfo returns comprehensive offset information for a partition
+func (integration *SMQOffsetIntegration) GetPartitionOffsetInfo(namespace, topicName string, partition *schema_pb.Partition) (*PartitionOffsetInfo, error) {
+ hwm, err := integration.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ earliestOffset := int64(0)
+ latestOffset := hwm - 1
+ if hwm == 0 {
+ latestOffset = -1 // No records
+ }
+
+ // Count active subscriptions for this partition
+ activeSubscriptions := int64(0)
+ integration.mu.RLock()
+ for _, subscription := range integration.offsetSubscriber.subscriptions {
+ if subscription.IsActive && partitionKey(subscription.Partition) == partitionKey(partition) {
+ activeSubscriptions++
+ }
+ }
+ integration.mu.RUnlock()
+
+ return &PartitionOffsetInfo{
+ Partition: partition,
+ EarliestOffset: earliestOffset,
+ LatestOffset: latestOffset,
+ HighWaterMark: hwm,
+ RecordCount: hwm,
+ ActiveSubscriptions: activeSubscriptions,
+ }, nil
+}
+
+// GetSubscription retrieves an existing subscription
+func (integration *SMQOffsetIntegration) GetSubscription(subscriptionID string) (*OffsetSubscription, error) {
+ return integration.offsetSubscriber.GetSubscription(subscriptionID)
+}
+
+// ListActiveSubscriptions returns all active subscriptions
+func (integration *SMQOffsetIntegration) ListActiveSubscriptions() ([]*OffsetSubscription, error) {
+ integration.mu.RLock()
+ defer integration.mu.RUnlock()
+
+ result := make([]*OffsetSubscription, 0)
+ for _, subscription := range integration.offsetSubscriber.subscriptions {
+ if subscription.IsActive {
+ result = append(result, subscription)
+ }
+ }
+
+ return result, nil
+}
+
+// AssignSingleOffset assigns a single offset for a partition
+func (integration *SMQOffsetIntegration) AssignSingleOffset(namespace, topicName string, partition *schema_pb.Partition) *AssignmentResult {
+ return integration.offsetAssigner.AssignSingleOffset(namespace, topicName, partition)
+}
+
+// AssignBatchOffsets assigns a batch of offsets for a partition
+func (integration *SMQOffsetIntegration) AssignBatchOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) *AssignmentResult {
+ return integration.offsetAssigner.AssignBatchOffsets(namespace, topicName, partition, count)
+}
+
+// Reset resets the integration layer state (for testing)
+func (integration *SMQOffsetIntegration) Reset() {
+ integration.mu.Lock()
+ defer integration.mu.Unlock()
+
+ // Note: No in-memory maps to clear (removed to prevent memory leaks)
+
+ // Close all subscriptions
+ for _, subscription := range integration.offsetSubscriber.subscriptions {
+ subscription.IsActive = false
+ }
+ integration.offsetSubscriber.subscriptions = make(map[string]*OffsetSubscription)
+
+ // Reset the registries by creating new ones with the same storage
+ // This ensures that partition managers start fresh
+ registry := NewPartitionOffsetRegistry(integration.offsetAssigner.registry.storage)
+ integration.offsetAssigner.registry = registry
+ integration.offsetSubscriber.offsetRegistry = registry
+ integration.offsetSeeker.offsetRegistry = registry
+}
diff --git a/weed/mq/offset/integration_test.go b/weed/mq/offset/integration_test.go
new file mode 100644
index 000000000..35299be65
--- /dev/null
+++ b/weed/mq/offset/integration_test.go
@@ -0,0 +1,544 @@
+package offset
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func TestSMQOffsetIntegration_PublishRecord(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish a single record
+ response, err := integration.PublishRecord(
+ "test-namespace", "test-topic",
+ partition,
+ []byte("test-key"),
+ &schema_pb.RecordValue{},
+ )
+
+ if err != nil {
+ t.Fatalf("Failed to publish record: %v", err)
+ }
+
+ if response.Error != "" {
+ t.Errorf("Expected no error, got: %s", response.Error)
+ }
+
+ if response.BaseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
+ }
+
+ if response.LastOffset != 0 {
+ t.Errorf("Expected last offset 0, got %d", response.LastOffset)
+ }
+}
+
+func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Create batch of records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+
+ // Publish batch
+ response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+ if err != nil {
+ t.Fatalf("Failed to publish record batch: %v", err)
+ }
+
+ if response.Error != "" {
+ t.Errorf("Expected no error, got: %s", response.Error)
+ }
+
+ if response.BaseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
+ }
+
+ if response.LastOffset != 2 {
+ t.Errorf("Expected last offset 2, got %d", response.LastOffset)
+ }
+
+ // Verify high water mark
+ hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark: %v", err)
+ }
+
+ if hwm != 3 {
+ t.Errorf("Expected high water mark 3, got %d", hwm)
+ }
+}
+
+func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish empty batch
+ response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, []PublishRecordRequest{})
+ if err != nil {
+ t.Fatalf("Failed to publish empty batch: %v", err)
+ }
+
+ if response.Error == "" {
+ t.Error("Expected error for empty batch")
+ }
+}
+
+func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish some records first
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ sub, err := integration.CreateSubscription(
+ "test-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ if sub.ID != "test-sub" {
+ t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
+ }
+
+ if sub.StartOffset != 0 {
+ t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
+ }
+}
+
+func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish some records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ sub, err := integration.CreateSubscription(
+ "test-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Subscribe to records
+ responses, err := integration.SubscribeRecords(sub, 2)
+ if err != nil {
+ t.Fatalf("Failed to subscribe to records: %v", err)
+ }
+
+ if len(responses) != 2 {
+ t.Errorf("Expected 2 responses, got %d", len(responses))
+ }
+
+ // Check offset progression
+ if responses[0].Offset != 0 {
+ t.Errorf("Expected first record offset 0, got %d", responses[0].Offset)
+ }
+
+ if responses[1].Offset != 1 {
+ t.Errorf("Expected second record offset 1, got %d", responses[1].Offset)
+ }
+
+ // Check subscription advancement
+ if sub.CurrentOffset != 2 {
+ t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset)
+ }
+}
+
+func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Create subscription on empty partition
+ sub, err := integration.CreateSubscription(
+ "empty-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Subscribe to records (should return empty)
+ responses, err := integration.SubscribeRecords(sub, 10)
+ if err != nil {
+ t.Fatalf("Failed to subscribe to empty partition: %v", err)
+ }
+
+ if len(responses) != 0 {
+ t.Errorf("Expected 0 responses from empty partition, got %d", len(responses))
+ }
+}
+
+func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key4"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key5"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ sub, err := integration.CreateSubscription(
+ "seek-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Seek to offset 3
+ err = integration.SeekSubscription("seek-sub", 3)
+ if err != nil {
+ t.Fatalf("Failed to seek subscription: %v", err)
+ }
+
+ if sub.CurrentOffset != 3 {
+ t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset)
+ }
+
+ // Subscribe from new position
+ responses, err := integration.SubscribeRecords(sub, 2)
+ if err != nil {
+ t.Fatalf("Failed to subscribe after seek: %v", err)
+ }
+
+ if len(responses) != 2 {
+ t.Errorf("Expected 2 responses after seek, got %d", len(responses))
+ }
+
+ if responses[0].Offset != 3 {
+ t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset)
+ }
+}
+
+func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription at offset 1
+ sub, err := integration.CreateSubscription(
+ "lag-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_EXACT_OFFSET,
+ 1,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Get lag
+ lag, err := integration.GetSubscriptionLag("lag-sub")
+ if err != nil {
+ t.Fatalf("Failed to get subscription lag: %v", err)
+ }
+
+ expectedLag := int64(3 - 1) // hwm - current
+ if lag != expectedLag {
+ t.Errorf("Expected lag %d, got %d", expectedLag, lag)
+ }
+
+ // Advance subscription and check lag again
+ integration.SubscribeRecords(sub, 1)
+
+ lag, err = integration.GetSubscriptionLag("lag-sub")
+ if err != nil {
+ t.Fatalf("Failed to get lag after advance: %v", err)
+ }
+
+ expectedLag = int64(3 - 2) // hwm - current
+ if lag != expectedLag {
+ t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
+ }
+}
+
+func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Create subscription
+ _, err := integration.CreateSubscription(
+ "close-sub",
+ "test-namespace", "test-topic",
+ partition,
+ schema_pb.OffsetType_RESET_TO_EARLIEST,
+ 0,
+ )
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Close subscription
+ err = integration.CloseSubscription("close-sub")
+ if err != nil {
+ t.Fatalf("Failed to close subscription: %v", err)
+ }
+
+ // Try to get lag (should fail)
+ _, err = integration.GetSubscriptionLag("close-sub")
+ if err == nil {
+ t.Error("Expected error when getting lag for closed subscription")
+ }
+}
+
+func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Publish some records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Test valid range
+ err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 2)
+ if err != nil {
+ t.Errorf("Valid range should not return error: %v", err)
+ }
+
+ // Test invalid range (beyond hwm)
+ err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 5)
+ if err == nil {
+ t.Error("Expected error for range beyond high water mark")
+ }
+}
+
+func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Test empty partition
+ offsetRange, err := integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get available range for empty partition: %v", err)
+ }
+
+ if offsetRange.Count != 0 {
+ t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
+ }
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Test with data
+ offsetRange, err = integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get available range: %v", err)
+ }
+
+ if offsetRange.StartOffset != 0 {
+ t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
+ }
+
+ if offsetRange.EndOffset != 1 {
+ t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset)
+ }
+
+ if offsetRange.Count != 2 {
+ t.Errorf("Expected count 2, got %d", offsetRange.Count)
+ }
+}
+
+func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Initial metrics
+ metrics := integration.GetOffsetMetrics()
+ if metrics.TotalOffsets != 0 {
+ t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
+ }
+
+ if metrics.ActiveSubscriptions != 0 {
+ t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions)
+ }
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscriptions
+ integration.CreateSubscription("sub1", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ integration.CreateSubscription("sub2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+
+ // Check updated metrics
+ metrics = integration.GetOffsetMetrics()
+ if metrics.TotalOffsets != 2 {
+ t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets)
+ }
+
+ if metrics.ActiveSubscriptions != 2 {
+ t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions)
+ }
+
+ if metrics.PartitionCount != 1 {
+ t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
+ }
+}
+
+func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Test non-existent offset
+ info, err := integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0)
+ if err != nil {
+ t.Fatalf("Failed to get offset info: %v", err)
+ }
+
+ if info.Exists {
+ t.Error("Offset should not exist in empty partition")
+ }
+
+ // Publish record
+ integration.PublishRecord("test-namespace", "test-topic", partition, []byte("key1"), &schema_pb.RecordValue{})
+
+ // Test existing offset
+ info, err = integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0)
+ if err != nil {
+ t.Fatalf("Failed to get offset info for existing offset: %v", err)
+ }
+
+ if !info.Exists {
+ t.Error("Offset should exist after publishing")
+ }
+
+ if info.Offset != 0 {
+ t.Errorf("Expected offset 0, got %d", info.Offset)
+ }
+}
+
+func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ integration := NewSMQOffsetIntegration(storage)
+ partition := createTestPartition()
+
+ // Test empty partition
+ info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get partition offset info: %v", err)
+ }
+
+ if info.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
+ }
+
+ if info.LatestOffset != -1 {
+ t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
+ }
+
+ if info.HighWaterMark != 0 {
+ t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark)
+ }
+
+ if info.RecordCount != 0 {
+ t.Errorf("Expected record count 0, got %d", info.RecordCount)
+ }
+
+ // Publish records
+ records := []PublishRecordRequest{
+ {Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
+ {Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
+ }
+ integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
+
+ // Create subscription
+ integration.CreateSubscription("test-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+
+ // Test with data
+ info, err = integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get partition offset info with data: %v", err)
+ }
+
+ if info.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
+ }
+
+ if info.LatestOffset != 2 {
+ t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
+ }
+
+ if info.HighWaterMark != 3 {
+ t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
+ }
+
+ if info.RecordCount != 3 {
+ t.Errorf("Expected record count 3, got %d", info.RecordCount)
+ }
+
+ if info.ActiveSubscriptions != 1 {
+ t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions)
+ }
+}
diff --git a/weed/mq/offset/manager.go b/weed/mq/offset/manager.go
new file mode 100644
index 000000000..01976a8bf
--- /dev/null
+++ b/weed/mq/offset/manager.go
@@ -0,0 +1,343 @@
+package offset
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// PartitionOffsetManager manages sequential offset assignment for a single partition
+type PartitionOffsetManager struct {
+ mu sync.RWMutex
+ namespace string
+ topicName string
+ partition *schema_pb.Partition
+ nextOffset int64
+
+ // Checkpointing for recovery
+ lastCheckpoint int64
+ checkpointInterval int64
+ storage OffsetStorage
+}
+
+// OffsetStorage interface for persisting offset state
+type OffsetStorage interface {
+ // SaveCheckpoint persists the current offset state for recovery
+ // Takes topic information along with partition to determine the correct storage location
+ SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error
+
+ // LoadCheckpoint retrieves the last saved offset state
+ LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error)
+
+ // GetHighestOffset scans storage to find the highest assigned offset
+ GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error)
+}
+
+// NewPartitionOffsetManager creates a new offset manager for a partition
+func NewPartitionOffsetManager(namespace, topicName string, partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) {
+ manager := &PartitionOffsetManager{
+ namespace: namespace,
+ topicName: topicName,
+ partition: partition,
+ checkpointInterval: 1, // Checkpoint every offset for immediate persistence
+ storage: storage,
+ }
+
+ // Recover offset state
+ if err := manager.recover(); err != nil {
+ return nil, fmt.Errorf("failed to recover offset state: %w", err)
+ }
+
+ return manager, nil
+}
+
+// AssignOffset assigns the next sequential offset
+func (m *PartitionOffsetManager) AssignOffset() int64 {
+ var shouldCheckpoint bool
+ var checkpointOffset int64
+
+ m.mu.Lock()
+ offset := m.nextOffset
+ m.nextOffset++
+
+ // Check if we should checkpoint (but don't do it inside the lock)
+ if offset-m.lastCheckpoint >= m.checkpointInterval {
+ shouldCheckpoint = true
+ checkpointOffset = offset
+ }
+ m.mu.Unlock()
+
+ // Checkpoint outside the lock to avoid deadlock
+ if shouldCheckpoint {
+ m.checkpoint(checkpointOffset)
+ }
+
+ return offset
+}
+
+// AssignOffsets assigns a batch of sequential offsets
+func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) {
+ var shouldCheckpoint bool
+ var checkpointOffset int64
+
+ m.mu.Lock()
+ baseOffset = m.nextOffset
+ lastOffset = m.nextOffset + count - 1
+ m.nextOffset += count
+
+ // Check if we should checkpoint (but don't do it inside the lock)
+ if lastOffset-m.lastCheckpoint >= m.checkpointInterval {
+ shouldCheckpoint = true
+ checkpointOffset = lastOffset
+ }
+ m.mu.Unlock()
+
+ // Checkpoint outside the lock to avoid deadlock
+ if shouldCheckpoint {
+ m.checkpoint(checkpointOffset)
+ }
+
+ return baseOffset, lastOffset
+}
+
+// GetNextOffset returns the next offset that will be assigned
+func (m *PartitionOffsetManager) GetNextOffset() int64 {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return m.nextOffset
+}
+
+// GetHighWaterMark returns the high water mark (next offset)
+func (m *PartitionOffsetManager) GetHighWaterMark() int64 {
+ return m.GetNextOffset()
+}
+
+// recover restores offset state from storage
+func (m *PartitionOffsetManager) recover() error {
+ var checkpointOffset int64 = -1
+ var highestOffset int64 = -1
+
+ // Try to load checkpoint
+ if offset, err := m.storage.LoadCheckpoint(m.namespace, m.topicName, m.partition); err == nil && offset >= 0 {
+ checkpointOffset = offset
+ }
+
+ // Try to scan storage for highest offset
+ if offset, err := m.storage.GetHighestOffset(m.namespace, m.topicName, m.partition); err == nil && offset >= 0 {
+ highestOffset = offset
+ }
+
+ // Use the higher of checkpoint or storage scan
+ if checkpointOffset >= 0 && highestOffset >= 0 {
+ if highestOffset > checkpointOffset {
+ m.nextOffset = highestOffset + 1
+ m.lastCheckpoint = highestOffset
+ } else {
+ m.nextOffset = checkpointOffset + 1
+ m.lastCheckpoint = checkpointOffset
+ }
+ } else if checkpointOffset >= 0 {
+ m.nextOffset = checkpointOffset + 1
+ m.lastCheckpoint = checkpointOffset
+ } else if highestOffset >= 0 {
+ m.nextOffset = highestOffset + 1
+ m.lastCheckpoint = highestOffset
+ } else {
+ // No data exists, start from 0
+ m.nextOffset = 0
+ m.lastCheckpoint = -1
+ }
+
+ return nil
+}
+
+// checkpoint saves the current offset state
+func (m *PartitionOffsetManager) checkpoint(offset int64) {
+ if err := m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, offset); err != nil {
+ // Log error but don't fail - checkpointing is for optimization
+ fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err)
+ return
+ }
+
+ m.mu.Lock()
+ m.lastCheckpoint = offset
+ m.mu.Unlock()
+}
+
+// PartitionOffsetRegistry manages offset managers for multiple partitions
+type PartitionOffsetRegistry struct {
+ mu sync.RWMutex
+ managers map[string]*PartitionOffsetManager
+ storage OffsetStorage
+}
+
+// NewPartitionOffsetRegistry creates a new registry
+func NewPartitionOffsetRegistry(storage OffsetStorage) *PartitionOffsetRegistry {
+ return &PartitionOffsetRegistry{
+ managers: make(map[string]*PartitionOffsetManager),
+ storage: storage,
+ }
+}
+
+// GetManager returns the offset manager for a partition, creating it if needed
+func (r *PartitionOffsetRegistry) GetManager(namespace, topicName string, partition *schema_pb.Partition) (*PartitionOffsetManager, error) {
+ // CRITICAL FIX: Use TopicPartitionKey to ensure each topic has its own offset manager
+ key := TopicPartitionKey(namespace, topicName, partition)
+
+ r.mu.RLock()
+ manager, exists := r.managers[key]
+ r.mu.RUnlock()
+
+ if exists {
+ return manager, nil
+ }
+
+ // Create new manager
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ // Double-check after acquiring write lock
+ if manager, exists := r.managers[key]; exists {
+ return manager, nil
+ }
+
+ manager, err := NewPartitionOffsetManager(namespace, topicName, partition, r.storage)
+ if err != nil {
+ return nil, err
+ }
+
+ r.managers[key] = manager
+ return manager, nil
+}
+
+// AssignOffset assigns an offset for the given partition
+func (r *PartitionOffsetRegistry) AssignOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ manager, err := r.GetManager(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+
+ assignedOffset := manager.AssignOffset()
+
+ return assignedOffset, nil
+}
+
+// AssignOffsets assigns a batch of offsets for the given partition
+func (r *PartitionOffsetRegistry) AssignOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) (baseOffset, lastOffset int64, err error) {
+ manager, err := r.GetManager(namespace, topicName, partition)
+ if err != nil {
+ return 0, 0, err
+ }
+
+ baseOffset, lastOffset = manager.AssignOffsets(count)
+ return baseOffset, lastOffset, nil
+}
+
+// GetHighWaterMark returns the high water mark for a partition
+func (r *PartitionOffsetRegistry) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ manager, err := r.GetManager(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+
+ return manager.GetHighWaterMark(), nil
+}
+
+// TopicPartitionKey generates a unique key for a topic-partition combination
+// This is the canonical key format used across the offset management system
+func TopicPartitionKey(namespace, topicName string, partition *schema_pb.Partition) string {
+ return fmt.Sprintf("%s/%s/ring:%d:range:%d-%d",
+ namespace, topicName,
+ partition.RingSize, partition.RangeStart, partition.RangeStop)
+}
+
+// PartitionKey generates a unique key for a partition (without topic context)
+// Note: UnixTimeNs is intentionally excluded from the key because it represents
+// partition creation time, not partition identity. Using it would cause offset
+// tracking to reset whenever a partition is recreated or looked up again.
+// DEPRECATED: Use TopicPartitionKey for production code to avoid key collisions
+func PartitionKey(partition *schema_pb.Partition) string {
+ return fmt.Sprintf("ring:%d:range:%d-%d",
+ partition.RingSize, partition.RangeStart, partition.RangeStop)
+}
+
+// partitionKey is the internal lowercase version for backward compatibility within this package
+func partitionKey(partition *schema_pb.Partition) string {
+ return PartitionKey(partition)
+}
+
+// OffsetAssignment represents an assigned offset with metadata
+type OffsetAssignment struct {
+ Offset int64
+ Timestamp int64
+ Partition *schema_pb.Partition
+}
+
+// BatchOffsetAssignment represents a batch of assigned offsets
+type BatchOffsetAssignment struct {
+ BaseOffset int64
+ LastOffset int64
+ Count int64
+ Timestamp int64
+ Partition *schema_pb.Partition
+}
+
+// AssignmentResult contains the result of offset assignment
+type AssignmentResult struct {
+ Assignment *OffsetAssignment
+ Batch *BatchOffsetAssignment
+ Error error
+}
+
+// OffsetAssigner provides high-level offset assignment operations
+type OffsetAssigner struct {
+ registry *PartitionOffsetRegistry
+}
+
+// NewOffsetAssigner creates a new offset assigner
+func NewOffsetAssigner(storage OffsetStorage) *OffsetAssigner {
+ return &OffsetAssigner{
+ registry: NewPartitionOffsetRegistry(storage),
+ }
+}
+
+// AssignSingleOffset assigns a single offset with timestamp
+func (a *OffsetAssigner) AssignSingleOffset(namespace, topicName string, partition *schema_pb.Partition) *AssignmentResult {
+ offset, err := a.registry.AssignOffset(namespace, topicName, partition)
+ if err != nil {
+ return &AssignmentResult{Error: err}
+ }
+
+ return &AssignmentResult{
+ Assignment: &OffsetAssignment{
+ Offset: offset,
+ Timestamp: time.Now().UnixNano(),
+ Partition: partition,
+ },
+ }
+}
+
+// AssignBatchOffsets assigns a batch of offsets with timestamp
+func (a *OffsetAssigner) AssignBatchOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) *AssignmentResult {
+ baseOffset, lastOffset, err := a.registry.AssignOffsets(namespace, topicName, partition, count)
+ if err != nil {
+ return &AssignmentResult{Error: err}
+ }
+
+ return &AssignmentResult{
+ Batch: &BatchOffsetAssignment{
+ BaseOffset: baseOffset,
+ LastOffset: lastOffset,
+ Count: count,
+ Timestamp: time.Now().UnixNano(),
+ Partition: partition,
+ },
+ }
+}
+
+// GetHighWaterMark returns the high water mark for a partition
+func (a *OffsetAssigner) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ return a.registry.GetHighWaterMark(namespace, topicName, partition)
+}
diff --git a/weed/mq/offset/manager_test.go b/weed/mq/offset/manager_test.go
new file mode 100644
index 000000000..0db301e84
--- /dev/null
+++ b/weed/mq/offset/manager_test.go
@@ -0,0 +1,388 @@
+package offset
+
+import (
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func createTestPartition() *schema_pb.Partition {
+ return &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+}
+
+func TestPartitionOffsetManager_BasicAssignment(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ partition := createTestPartition()
+
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ t.Fatalf("Failed to create offset manager: %v", err)
+ }
+
+ // Test sequential offset assignment
+ for i := int64(0); i < 10; i++ {
+ offset := manager.AssignOffset()
+ if offset != i {
+ t.Errorf("Expected offset %d, got %d", i, offset)
+ }
+ }
+
+ // Test high water mark
+ hwm := manager.GetHighWaterMark()
+ if hwm != 10 {
+ t.Errorf("Expected high water mark 10, got %d", hwm)
+ }
+}
+
+func TestPartitionOffsetManager_BatchAssignment(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ partition := createTestPartition()
+
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ t.Fatalf("Failed to create offset manager: %v", err)
+ }
+
+ // Assign batch of 5 offsets
+ baseOffset, lastOffset := manager.AssignOffsets(5)
+ if baseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", baseOffset)
+ }
+ if lastOffset != 4 {
+ t.Errorf("Expected last offset 4, got %d", lastOffset)
+ }
+
+ // Assign another batch
+ baseOffset, lastOffset = manager.AssignOffsets(3)
+ if baseOffset != 5 {
+ t.Errorf("Expected base offset 5, got %d", baseOffset)
+ }
+ if lastOffset != 7 {
+ t.Errorf("Expected last offset 7, got %d", lastOffset)
+ }
+
+ // Check high water mark
+ hwm := manager.GetHighWaterMark()
+ if hwm != 8 {
+ t.Errorf("Expected high water mark 8, got %d", hwm)
+ }
+}
+
+func TestPartitionOffsetManager_Recovery(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ partition := createTestPartition()
+
+ // Create manager and assign some offsets
+ manager1, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ t.Fatalf("Failed to create offset manager: %v", err)
+ }
+
+ // Assign offsets and simulate records
+ for i := 0; i < 150; i++ { // More than checkpoint interval
+ offset := manager1.AssignOffset()
+ storage.AddRecord("test-namespace", "test-topic", partition, offset)
+ }
+
+ // Wait for checkpoint to complete
+ time.Sleep(100 * time.Millisecond)
+
+ // Create new manager (simulates restart)
+ manager2, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ t.Fatalf("Failed to create offset manager after recovery: %v", err)
+ }
+
+ // Next offset should continue from checkpoint + 1
+ // With checkpoint interval 100, checkpoint happens at offset 100
+ // So recovery should start from 101, but we assigned 150 offsets (0-149)
+ // The checkpoint should be at 100, so next offset should be 101
+ // But since we have records up to 149, it should recover from storage scan
+ nextOffset := manager2.AssignOffset()
+ if nextOffset != 150 {
+ t.Errorf("Expected next offset 150 after recovery, got %d", nextOffset)
+ }
+}
+
+func TestPartitionOffsetManager_RecoveryFromStorage(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ partition := createTestPartition()
+
+ // Simulate existing records in storage without checkpoint
+ for i := int64(0); i < 50; i++ {
+ storage.AddRecord("test-namespace", "test-topic", partition, i)
+ }
+
+ // Create manager - should recover from storage scan
+ manager, err := NewPartitionOffsetManager("test-namespace", "test-topic", partition, storage)
+ if err != nil {
+ t.Fatalf("Failed to create offset manager: %v", err)
+ }
+
+ // Next offset should be 50
+ nextOffset := manager.AssignOffset()
+ if nextOffset != 50 {
+ t.Errorf("Expected next offset 50 after storage recovery, got %d", nextOffset)
+ }
+}
+
+func TestPartitionOffsetRegistry_MultiplePartitions(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+
+ // Create different partitions
+ partition1 := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ partition2 := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 32,
+ RangeStop: 63,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ // Assign offsets to different partitions
+ offset1, err := registry.AssignOffset("test-namespace", "test-topic", partition1)
+ if err != nil {
+ t.Fatalf("Failed to assign offset to partition1: %v", err)
+ }
+ if offset1 != 0 {
+ t.Errorf("Expected offset 0 for partition1, got %d", offset1)
+ }
+
+ offset2, err := registry.AssignOffset("test-namespace", "test-topic", partition2)
+ if err != nil {
+ t.Fatalf("Failed to assign offset to partition2: %v", err)
+ }
+ if offset2 != 0 {
+ t.Errorf("Expected offset 0 for partition2, got %d", offset2)
+ }
+
+ // Assign more offsets to partition1
+ offset1_2, err := registry.AssignOffset("test-namespace", "test-topic", partition1)
+ if err != nil {
+ t.Fatalf("Failed to assign second offset to partition1: %v", err)
+ }
+ if offset1_2 != 1 {
+ t.Errorf("Expected offset 1 for partition1, got %d", offset1_2)
+ }
+
+ // Partition2 should still be at 0 for next assignment
+ offset2_2, err := registry.AssignOffset("test-namespace", "test-topic", partition2)
+ if err != nil {
+ t.Fatalf("Failed to assign second offset to partition2: %v", err)
+ }
+ if offset2_2 != 1 {
+ t.Errorf("Expected offset 1 for partition2, got %d", offset2_2)
+ }
+}
+
+func TestPartitionOffsetRegistry_BatchAssignment(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ partition := createTestPartition()
+
+ // Assign batch of offsets
+ baseOffset, lastOffset, err := registry.AssignOffsets("test-namespace", "test-topic", partition, 10)
+ if err != nil {
+ t.Fatalf("Failed to assign batch offsets: %v", err)
+ }
+
+ if baseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", baseOffset)
+ }
+ if lastOffset != 9 {
+ t.Errorf("Expected last offset 9, got %d", lastOffset)
+ }
+
+ // Get high water mark
+ hwm, err := registry.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark: %v", err)
+ }
+ if hwm != 10 {
+ t.Errorf("Expected high water mark 10, got %d", hwm)
+ }
+}
+
+func TestOffsetAssigner_SingleAssignment(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ assigner := NewOffsetAssigner(storage)
+ partition := createTestPartition()
+
+ // Assign single offset
+ result := assigner.AssignSingleOffset("test-namespace", "test-topic", partition)
+ if result.Error != nil {
+ t.Fatalf("Failed to assign single offset: %v", result.Error)
+ }
+
+ if result.Assignment == nil {
+ t.Fatal("Assignment result is nil")
+ }
+
+ if result.Assignment.Offset != 0 {
+ t.Errorf("Expected offset 0, got %d", result.Assignment.Offset)
+ }
+
+ if result.Assignment.Partition != partition {
+ t.Error("Partition mismatch in assignment")
+ }
+
+ if result.Assignment.Timestamp <= 0 {
+ t.Error("Timestamp should be set")
+ }
+}
+
+func TestOffsetAssigner_BatchAssignment(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ assigner := NewOffsetAssigner(storage)
+ partition := createTestPartition()
+
+ // Assign batch of offsets
+ result := assigner.AssignBatchOffsets("test-namespace", "test-topic", partition, 5)
+ if result.Error != nil {
+ t.Fatalf("Failed to assign batch offsets: %v", result.Error)
+ }
+
+ if result.Batch == nil {
+ t.Fatal("Batch result is nil")
+ }
+
+ if result.Batch.BaseOffset != 0 {
+ t.Errorf("Expected base offset 0, got %d", result.Batch.BaseOffset)
+ }
+
+ if result.Batch.LastOffset != 4 {
+ t.Errorf("Expected last offset 4, got %d", result.Batch.LastOffset)
+ }
+
+ if result.Batch.Count != 5 {
+ t.Errorf("Expected count 5, got %d", result.Batch.Count)
+ }
+
+ if result.Batch.Timestamp <= 0 {
+ t.Error("Timestamp should be set")
+ }
+}
+
+func TestOffsetAssigner_HighWaterMark(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ assigner := NewOffsetAssigner(storage)
+ partition := createTestPartition()
+
+ // Initially should be 0
+ hwm, err := assigner.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get initial high water mark: %v", err)
+ }
+ if hwm != 0 {
+ t.Errorf("Expected initial high water mark 0, got %d", hwm)
+ }
+
+ // Assign some offsets
+ assigner.AssignBatchOffsets("test-namespace", "test-topic", partition, 10)
+
+ // High water mark should be updated
+ hwm, err = assigner.GetHighWaterMark("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get high water mark after assignment: %v", err)
+ }
+ if hwm != 10 {
+ t.Errorf("Expected high water mark 10, got %d", hwm)
+ }
+}
+
+func TestPartitionKey(t *testing.T) {
+ partition1 := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: 1234567890,
+ }
+
+ partition2 := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: 1234567890,
+ }
+
+ partition3 := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 32,
+ RangeStop: 63,
+ UnixTimeNs: 1234567890,
+ }
+
+ key1 := partitionKey(partition1)
+ key2 := partitionKey(partition2)
+ key3 := partitionKey(partition3)
+
+ // Same partitions should have same key
+ if key1 != key2 {
+ t.Errorf("Same partitions should have same key: %s vs %s", key1, key2)
+ }
+
+ // Different partitions should have different keys
+ if key1 == key3 {
+ t.Errorf("Different partitions should have different keys: %s vs %s", key1, key3)
+ }
+}
+
+func TestConcurrentOffsetAssignment(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ partition := createTestPartition()
+
+ const numGoroutines = 10
+ const offsetsPerGoroutine = 100
+
+ results := make(chan int64, numGoroutines*offsetsPerGoroutine)
+
+ // Start concurrent offset assignments
+ for i := 0; i < numGoroutines; i++ {
+ go func() {
+ for j := 0; j < offsetsPerGoroutine; j++ {
+ offset, err := registry.AssignOffset("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Errorf("Failed to assign offset: %v", err)
+ return
+ }
+ results <- offset
+ }
+ }()
+ }
+
+ // Collect all results
+ offsets := make(map[int64]bool)
+ for i := 0; i < numGoroutines*offsetsPerGoroutine; i++ {
+ offset := <-results
+ if offsets[offset] {
+ t.Errorf("Duplicate offset assigned: %d", offset)
+ }
+ offsets[offset] = true
+ }
+
+ // Verify we got all expected offsets
+ expectedCount := numGoroutines * offsetsPerGoroutine
+ if len(offsets) != expectedCount {
+ t.Errorf("Expected %d unique offsets, got %d", expectedCount, len(offsets))
+ }
+
+ // Verify offsets are in expected range
+ for offset := range offsets {
+ if offset < 0 || offset >= int64(expectedCount) {
+ t.Errorf("Offset %d is out of expected range [0, %d)", offset, expectedCount)
+ }
+ }
+}
diff --git a/weed/mq/offset/memory_storage_test.go b/weed/mq/offset/memory_storage_test.go
new file mode 100644
index 000000000..4434e1eb6
--- /dev/null
+++ b/weed/mq/offset/memory_storage_test.go
@@ -0,0 +1,228 @@
+package offset
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// recordEntry holds a record with timestamp for TTL cleanup
+type recordEntry struct {
+ exists bool
+ timestamp time.Time
+}
+
+// InMemoryOffsetStorage provides an in-memory implementation of OffsetStorage for testing ONLY
+// WARNING: This should NEVER be used in production - use FilerOffsetStorage or SQLOffsetStorage instead
+type InMemoryOffsetStorage struct {
+ mu sync.RWMutex
+ checkpoints map[string]int64 // partition key -> offset
+ records map[string]map[int64]*recordEntry // partition key -> offset -> entry with timestamp
+
+ // Memory leak protection
+ maxRecordsPerPartition int // Maximum records to keep per partition
+ recordTTL time.Duration // TTL for record entries
+ lastCleanup time.Time // Last cleanup time
+ cleanupInterval time.Duration // How often to run cleanup
+}
+
+// NewInMemoryOffsetStorage creates a new in-memory storage with memory leak protection
+// FOR TESTING ONLY - do not use in production
+func NewInMemoryOffsetStorage() *InMemoryOffsetStorage {
+ return &InMemoryOffsetStorage{
+ checkpoints: make(map[string]int64),
+ records: make(map[string]map[int64]*recordEntry),
+ maxRecordsPerPartition: 10000, // Limit to 10K records per partition
+ recordTTL: 1 * time.Hour, // Records expire after 1 hour
+ cleanupInterval: 5 * time.Minute, // Cleanup every 5 minutes
+ lastCleanup: time.Now(),
+ }
+}
+
+// SaveCheckpoint saves the checkpoint for a partition
+func (s *InMemoryOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Use TopicPartitionKey for consistency with other storage implementations
+ key := TopicPartitionKey(namespace, topicName, partition)
+ s.checkpoints[key] = offset
+ return nil
+}
+
+// LoadCheckpoint loads the checkpoint for a partition
+func (s *InMemoryOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ // Use TopicPartitionKey to match SaveCheckpoint
+ key := TopicPartitionKey(namespace, topicName, partition)
+ offset, exists := s.checkpoints[key]
+ if !exists {
+ return -1, fmt.Errorf("no checkpoint found")
+ }
+
+ return offset, nil
+}
+
+// GetHighestOffset finds the highest offset in storage for a partition
+func (s *InMemoryOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ // Use TopicPartitionKey to match SaveCheckpoint
+ key := TopicPartitionKey(namespace, topicName, partition)
+ offsets, exists := s.records[key]
+ if !exists || len(offsets) == 0 {
+ return -1, fmt.Errorf("no records found")
+ }
+
+ var highest int64 = -1
+ for offset, entry := range offsets {
+ if entry.exists && offset > highest {
+ highest = offset
+ }
+ }
+
+ return highest, nil
+}
+
+// AddRecord simulates storing a record with an offset (for testing)
+func (s *InMemoryOffsetStorage) AddRecord(namespace, topicName string, partition *schema_pb.Partition, offset int64) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Use TopicPartitionKey to match GetHighestOffset
+ key := TopicPartitionKey(namespace, topicName, partition)
+ if s.records[key] == nil {
+ s.records[key] = make(map[int64]*recordEntry)
+ }
+
+ // Add record with current timestamp
+ s.records[key][offset] = &recordEntry{
+ exists: true,
+ timestamp: time.Now(),
+ }
+
+ // Trigger cleanup if needed (memory leak protection)
+ s.cleanupIfNeeded()
+}
+
+// GetRecordCount returns the number of records for a partition (for testing)
+func (s *InMemoryOffsetStorage) GetRecordCount(namespace, topicName string, partition *schema_pb.Partition) int {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ // Use TopicPartitionKey to match GetHighestOffset
+ key := TopicPartitionKey(namespace, topicName, partition)
+ if offsets, exists := s.records[key]; exists {
+ count := 0
+ for _, entry := range offsets {
+ if entry.exists {
+ count++
+ }
+ }
+ return count
+ }
+ return 0
+}
+
+// Clear removes all data (for testing)
+func (s *InMemoryOffsetStorage) Clear() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.checkpoints = make(map[string]int64)
+ s.records = make(map[string]map[int64]*recordEntry)
+ s.lastCleanup = time.Now()
+}
+
+// Reset removes all data (implements resettable interface for shutdown)
+func (s *InMemoryOffsetStorage) Reset() error {
+ s.Clear()
+ return nil
+}
+
+// cleanupIfNeeded performs memory leak protection cleanup
+// This method assumes the caller already holds the write lock
+func (s *InMemoryOffsetStorage) cleanupIfNeeded() {
+ now := time.Now()
+
+ // Only cleanup if enough time has passed
+ if now.Sub(s.lastCleanup) < s.cleanupInterval {
+ return
+ }
+
+ s.lastCleanup = now
+ cutoff := now.Add(-s.recordTTL)
+
+ // Clean up expired records and enforce size limits
+ for partitionKey, offsets := range s.records {
+ // Remove expired records
+ for offset, entry := range offsets {
+ if entry.timestamp.Before(cutoff) {
+ delete(offsets, offset)
+ }
+ }
+
+ // Enforce size limit per partition
+ if len(offsets) > s.maxRecordsPerPartition {
+ // Keep only the most recent records
+ type offsetTime struct {
+ offset int64
+ time time.Time
+ }
+
+ var entries []offsetTime
+ for offset, entry := range offsets {
+ entries = append(entries, offsetTime{offset: offset, time: entry.timestamp})
+ }
+
+ // Sort by timestamp (newest first)
+ for i := 0; i < len(entries)-1; i++ {
+ for j := i + 1; j < len(entries); j++ {
+ if entries[i].time.Before(entries[j].time) {
+ entries[i], entries[j] = entries[j], entries[i]
+ }
+ }
+ }
+
+ // Keep only the newest maxRecordsPerPartition entries
+ newOffsets := make(map[int64]*recordEntry)
+ for i := 0; i < s.maxRecordsPerPartition && i < len(entries); i++ {
+ offset := entries[i].offset
+ newOffsets[offset] = offsets[offset]
+ }
+
+ s.records[partitionKey] = newOffsets
+ }
+
+ // Remove empty partition maps
+ if len(offsets) == 0 {
+ delete(s.records, partitionKey)
+ }
+ }
+}
+
+// GetMemoryStats returns memory usage statistics for monitoring
+func (s *InMemoryOffsetStorage) GetMemoryStats() map[string]interface{} {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ totalRecords := 0
+ partitionCount := len(s.records)
+
+ for _, offsets := range s.records {
+ totalRecords += len(offsets)
+ }
+
+ return map[string]interface{}{
+ "total_partitions": partitionCount,
+ "total_records": totalRecords,
+ "max_records_per_partition": s.maxRecordsPerPartition,
+ "record_ttl_hours": s.recordTTL.Hours(),
+ "last_cleanup": s.lastCleanup,
+ }
+}
diff --git a/weed/mq/offset/migration.go b/weed/mq/offset/migration.go
new file mode 100644
index 000000000..106129206
--- /dev/null
+++ b/weed/mq/offset/migration.go
@@ -0,0 +1,302 @@
+package offset
+
+import (
+ "database/sql"
+ "fmt"
+ "time"
+)
+
+// MigrationVersion represents a database migration version
+type MigrationVersion struct {
+ Version int
+ Description string
+ SQL string
+}
+
+// GetMigrations returns all available migrations for offset storage
+func GetMigrations() []MigrationVersion {
+ return []MigrationVersion{
+ {
+ Version: 1,
+ Description: "Create initial offset storage tables",
+ SQL: `
+ -- Partition offset checkpoints table
+ -- TODO: Add _index as computed column when supported by database
+ CREATE TABLE IF NOT EXISTS partition_offset_checkpoints (
+ partition_key TEXT PRIMARY KEY,
+ ring_size INTEGER NOT NULL,
+ range_start INTEGER NOT NULL,
+ range_stop INTEGER NOT NULL,
+ unix_time_ns INTEGER NOT NULL,
+ checkpoint_offset INTEGER NOT NULL,
+ updated_at INTEGER NOT NULL
+ );
+
+ -- Offset mappings table for detailed tracking
+ -- TODO: Add _index as computed column when supported by database
+ CREATE TABLE IF NOT EXISTS offset_mappings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ partition_key TEXT NOT NULL,
+ kafka_offset INTEGER NOT NULL,
+ smq_timestamp INTEGER NOT NULL,
+ message_size INTEGER NOT NULL,
+ created_at INTEGER NOT NULL,
+ UNIQUE(partition_key, kafka_offset)
+ );
+
+ -- Schema migrations tracking table
+ CREATE TABLE IF NOT EXISTS schema_migrations (
+ version INTEGER PRIMARY KEY,
+ description TEXT NOT NULL,
+ applied_at INTEGER NOT NULL
+ );
+ `,
+ },
+ {
+ Version: 2,
+ Description: "Add indexes for performance optimization",
+ SQL: `
+ -- Indexes for performance
+ CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition
+ ON partition_offset_checkpoints(partition_key);
+
+ CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset
+ ON offset_mappings(partition_key, kafka_offset);
+
+ CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp
+ ON offset_mappings(partition_key, smq_timestamp);
+
+ CREATE INDEX IF NOT EXISTS idx_offset_mappings_created_at
+ ON offset_mappings(created_at);
+ `,
+ },
+ {
+ Version: 3,
+ Description: "Add partition metadata table for enhanced tracking",
+ SQL: `
+ -- Partition metadata table
+ CREATE TABLE IF NOT EXISTS partition_metadata (
+ partition_key TEXT PRIMARY KEY,
+ ring_size INTEGER NOT NULL,
+ range_start INTEGER NOT NULL,
+ range_stop INTEGER NOT NULL,
+ unix_time_ns INTEGER NOT NULL,
+ created_at INTEGER NOT NULL,
+ last_activity_at INTEGER NOT NULL,
+ record_count INTEGER DEFAULT 0,
+ total_size INTEGER DEFAULT 0
+ );
+
+ -- Index for partition metadata
+ CREATE INDEX IF NOT EXISTS idx_partition_metadata_activity
+ ON partition_metadata(last_activity_at);
+ `,
+ },
+ }
+}
+
+// MigrationManager handles database schema migrations
+type MigrationManager struct {
+ db *sql.DB
+}
+
+// NewMigrationManager creates a new migration manager
+func NewMigrationManager(db *sql.DB) *MigrationManager {
+ return &MigrationManager{db: db}
+}
+
+// GetCurrentVersion returns the current schema version
+func (m *MigrationManager) GetCurrentVersion() (int, error) {
+ // First, ensure the migrations table exists
+ _, err := m.db.Exec(`
+ CREATE TABLE IF NOT EXISTS schema_migrations (
+ version INTEGER PRIMARY KEY,
+ description TEXT NOT NULL,
+ applied_at INTEGER NOT NULL
+ )
+ `)
+ if err != nil {
+ return 0, fmt.Errorf("failed to create migrations table: %w", err)
+ }
+
+ var version sql.NullInt64
+ err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version)
+ if err != nil {
+ return 0, fmt.Errorf("failed to get current version: %w", err)
+ }
+
+ if !version.Valid {
+ return 0, nil // No migrations applied yet
+ }
+
+ return int(version.Int64), nil
+}
+
+// ApplyMigrations applies all pending migrations
+func (m *MigrationManager) ApplyMigrations() error {
+ currentVersion, err := m.GetCurrentVersion()
+ if err != nil {
+ return fmt.Errorf("failed to get current version: %w", err)
+ }
+
+ migrations := GetMigrations()
+
+ for _, migration := range migrations {
+ if migration.Version <= currentVersion {
+ continue // Already applied
+ }
+
+ fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description)
+
+ // Begin transaction
+ tx, err := m.db.Begin()
+ if err != nil {
+ return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err)
+ }
+
+ // Execute migration SQL
+ _, err = tx.Exec(migration.SQL)
+ if err != nil {
+ tx.Rollback()
+ return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err)
+ }
+
+ // Record migration as applied
+ _, err = tx.Exec(
+ "INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
+ migration.Version,
+ migration.Description,
+ getCurrentTimestamp(),
+ )
+ if err != nil {
+ tx.Rollback()
+ return fmt.Errorf("failed to record migration %d: %w", migration.Version, err)
+ }
+
+ // Commit transaction
+ err = tx.Commit()
+ if err != nil {
+ return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err)
+ }
+
+ fmt.Printf("Successfully applied migration %d\n", migration.Version)
+ }
+
+ return nil
+}
+
+// RollbackMigration rolls back a specific migration (if supported)
+func (m *MigrationManager) RollbackMigration(version int) error {
+ // TODO: Implement rollback functionality
+ // ASSUMPTION: For now, rollbacks are not supported as they require careful planning
+ return fmt.Errorf("migration rollbacks not implemented - manual intervention required")
+}
+
+// GetAppliedMigrations returns a list of all applied migrations
+func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) {
+ rows, err := m.db.Query(`
+ SELECT version, description, applied_at
+ FROM schema_migrations
+ ORDER BY version
+ `)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query applied migrations: %w", err)
+ }
+ defer rows.Close()
+
+ var migrations []AppliedMigration
+ for rows.Next() {
+ var migration AppliedMigration
+ err := rows.Scan(&migration.Version, &migration.Description, &migration.AppliedAt)
+ if err != nil {
+ return nil, fmt.Errorf("failed to scan migration: %w", err)
+ }
+ migrations = append(migrations, migration)
+ }
+
+ return migrations, nil
+}
+
+// ValidateSchema validates that the database schema is up to date
+func (m *MigrationManager) ValidateSchema() error {
+ currentVersion, err := m.GetCurrentVersion()
+ if err != nil {
+ return fmt.Errorf("failed to get current version: %w", err)
+ }
+
+ migrations := GetMigrations()
+ if len(migrations) == 0 {
+ return nil
+ }
+
+ latestVersion := migrations[len(migrations)-1].Version
+ if currentVersion < latestVersion {
+ return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion)
+ }
+
+ return nil
+}
+
+// AppliedMigration represents a migration that has been applied
+type AppliedMigration struct {
+ Version int
+ Description string
+ AppliedAt int64
+}
+
+// getCurrentTimestamp returns the current timestamp in nanoseconds
+func getCurrentTimestamp() int64 {
+ return time.Now().UnixNano()
+}
+
+// CreateDatabase creates and initializes a new offset storage database
+func CreateDatabase(dbPath string) (*sql.DB, error) {
+ // TODO: Support different database types (PostgreSQL, MySQL, etc.)
+ // ASSUMPTION: Using SQLite for now, can be extended for other databases
+
+ db, err := sql.Open("sqlite3", dbPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open database: %w", err)
+ }
+
+ // Configure SQLite for better performance
+ pragmas := []string{
+ "PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency
+ "PRAGMA synchronous=NORMAL", // Balance between safety and performance
+ "PRAGMA cache_size=10000", // Increase cache size
+ "PRAGMA foreign_keys=ON", // Enable foreign key constraints
+ "PRAGMA temp_store=MEMORY", // Store temporary tables in memory
+ }
+
+ for _, pragma := range pragmas {
+ _, err := db.Exec(pragma)
+ if err != nil {
+ db.Close()
+ return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err)
+ }
+ }
+
+ // Apply migrations
+ migrationManager := NewMigrationManager(db)
+ err = migrationManager.ApplyMigrations()
+ if err != nil {
+ db.Close()
+ return nil, fmt.Errorf("failed to apply migrations: %w", err)
+ }
+
+ return db, nil
+}
+
+// BackupDatabase creates a backup of the offset storage database
+func BackupDatabase(sourceDB *sql.DB, backupPath string) error {
+ // TODO: Implement database backup functionality
+ // ASSUMPTION: This would use database-specific backup mechanisms
+ return fmt.Errorf("database backup not implemented yet")
+}
+
+// RestoreDatabase restores a database from a backup
+func RestoreDatabase(backupPath, targetPath string) error {
+ // TODO: Implement database restore functionality
+ // ASSUMPTION: This would use database-specific restore mechanisms
+ return fmt.Errorf("database restore not implemented yet")
+}
diff --git a/weed/mq/offset/sql_storage.go b/weed/mq/offset/sql_storage.go
new file mode 100644
index 000000000..c3107e5a4
--- /dev/null
+++ b/weed/mq/offset/sql_storage.go
@@ -0,0 +1,394 @@
+package offset
+
+import (
+ "database/sql"
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// OffsetEntry represents a mapping between Kafka offset and SMQ timestamp
+type OffsetEntry struct {
+ KafkaOffset int64
+ SMQTimestamp int64
+ MessageSize int32
+}
+
+// SQLOffsetStorage implements OffsetStorage using SQL database with _index column
+type SQLOffsetStorage struct {
+ db *sql.DB
+}
+
+// NewSQLOffsetStorage creates a new SQL-based offset storage
+func NewSQLOffsetStorage(db *sql.DB) (*SQLOffsetStorage, error) {
+ storage := &SQLOffsetStorage{db: db}
+
+ // Initialize database schema
+ if err := storage.initializeSchema(); err != nil {
+ return nil, fmt.Errorf("failed to initialize schema: %w", err)
+ }
+
+ return storage, nil
+}
+
+// initializeSchema creates the necessary tables for offset storage
+func (s *SQLOffsetStorage) initializeSchema() error {
+ // TODO: Create offset storage tables with _index as hidden column
+ // ASSUMPTION: Using SQLite-compatible syntax, may need adaptation for other databases
+
+ queries := []string{
+ // Partition offset checkpoints table
+ // TODO: Add _index as computed column when supported by database
+ // ASSUMPTION: Using regular columns for now, _index concept preserved for future enhancement
+ `CREATE TABLE IF NOT EXISTS partition_offset_checkpoints (
+ partition_key TEXT PRIMARY KEY,
+ ring_size INTEGER NOT NULL,
+ range_start INTEGER NOT NULL,
+ range_stop INTEGER NOT NULL,
+ unix_time_ns INTEGER NOT NULL,
+ checkpoint_offset INTEGER NOT NULL,
+ updated_at INTEGER NOT NULL
+ )`,
+
+ // Offset mappings table for detailed tracking
+ // TODO: Add _index as computed column when supported by database
+ `CREATE TABLE IF NOT EXISTS offset_mappings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ partition_key TEXT NOT NULL,
+ kafka_offset INTEGER NOT NULL,
+ smq_timestamp INTEGER NOT NULL,
+ message_size INTEGER NOT NULL,
+ created_at INTEGER NOT NULL,
+ UNIQUE(partition_key, kafka_offset)
+ )`,
+
+ // Indexes for performance
+ `CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition
+ ON partition_offset_checkpoints(partition_key)`,
+
+ `CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset
+ ON offset_mappings(partition_key, kafka_offset)`,
+
+ `CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp
+ ON offset_mappings(partition_key, smq_timestamp)`,
+ }
+
+ for _, query := range queries {
+ if _, err := s.db.Exec(query); err != nil {
+ return fmt.Errorf("failed to execute schema query: %w", err)
+ }
+ }
+
+ return nil
+}
+
+// SaveCheckpoint saves the checkpoint for a partition
+func (s *SQLOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error {
+ // Use TopicPartitionKey to ensure each topic has isolated checkpoint storage
+ partitionKey := TopicPartitionKey(namespace, topicName, partition)
+ now := time.Now().UnixNano()
+
+ // TODO: Use UPSERT for better performance
+ // ASSUMPTION: SQLite REPLACE syntax, may need adaptation for other databases
+ query := `
+ REPLACE INTO partition_offset_checkpoints
+ (partition_key, ring_size, range_start, range_stop, unix_time_ns, checkpoint_offset, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ `
+
+ _, err := s.db.Exec(query,
+ partitionKey,
+ partition.RingSize,
+ partition.RangeStart,
+ partition.RangeStop,
+ partition.UnixTimeNs,
+ offset,
+ now,
+ )
+
+ if err != nil {
+ return fmt.Errorf("failed to save checkpoint: %w", err)
+ }
+
+ return nil
+}
+
+// LoadCheckpoint loads the checkpoint for a partition
+func (s *SQLOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ // Use TopicPartitionKey to match SaveCheckpoint
+ partitionKey := TopicPartitionKey(namespace, topicName, partition)
+
+ query := `
+ SELECT checkpoint_offset
+ FROM partition_offset_checkpoints
+ WHERE partition_key = ?
+ `
+
+ var checkpointOffset int64
+ err := s.db.QueryRow(query, partitionKey).Scan(&checkpointOffset)
+
+ if err == sql.ErrNoRows {
+ return -1, fmt.Errorf("no checkpoint found")
+ }
+
+ if err != nil {
+ return -1, fmt.Errorf("failed to load checkpoint: %w", err)
+ }
+
+ return checkpointOffset, nil
+}
+
+// GetHighestOffset finds the highest offset in storage for a partition
+func (s *SQLOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ // Use TopicPartitionKey to match SaveCheckpoint
+ partitionKey := TopicPartitionKey(namespace, topicName, partition)
+
+ // TODO: Use _index column for efficient querying
+ // ASSUMPTION: kafka_offset represents the sequential offset we're tracking
+ query := `
+ SELECT MAX(kafka_offset)
+ FROM offset_mappings
+ WHERE partition_key = ?
+ `
+
+ var highestOffset sql.NullInt64
+ err := s.db.QueryRow(query, partitionKey).Scan(&highestOffset)
+
+ if err != nil {
+ return -1, fmt.Errorf("failed to get highest offset: %w", err)
+ }
+
+ if !highestOffset.Valid {
+ return -1, fmt.Errorf("no records found")
+ }
+
+ return highestOffset.Int64, nil
+}
+
+// SaveOffsetMapping stores an offset mapping (extends OffsetStorage interface)
+func (s *SQLOffsetStorage) SaveOffsetMapping(partitionKey string, kafkaOffset, smqTimestamp int64, size int32) error {
+ now := time.Now().UnixNano()
+
+ // TODO: Handle duplicate key conflicts gracefully
+ // ASSUMPTION: Using INSERT OR REPLACE for conflict resolution
+ query := `
+ INSERT OR REPLACE INTO offset_mappings
+ (partition_key, kafka_offset, smq_timestamp, message_size, created_at)
+ VALUES (?, ?, ?, ?, ?)
+ `
+
+ _, err := s.db.Exec(query, partitionKey, kafkaOffset, smqTimestamp, size, now)
+ if err != nil {
+ return fmt.Errorf("failed to save offset mapping: %w", err)
+ }
+
+ return nil
+}
+
+// LoadOffsetMappings retrieves all offset mappings for a partition
+func (s *SQLOffsetStorage) LoadOffsetMappings(partitionKey string) ([]OffsetEntry, error) {
+ // TODO: Add pagination for large result sets
+ // ASSUMPTION: Loading all mappings for now, should be paginated in production
+ query := `
+ SELECT kafka_offset, smq_timestamp, message_size
+ FROM offset_mappings
+ WHERE partition_key = ?
+ ORDER BY kafka_offset ASC
+ `
+
+ rows, err := s.db.Query(query, partitionKey)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query offset mappings: %w", err)
+ }
+ defer rows.Close()
+
+ var entries []OffsetEntry
+ for rows.Next() {
+ var entry OffsetEntry
+ err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize)
+ if err != nil {
+ return nil, fmt.Errorf("failed to scan offset entry: %w", err)
+ }
+ entries = append(entries, entry)
+ }
+
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("error iterating offset mappings: %w", err)
+ }
+
+ return entries, nil
+}
+
+// GetOffsetMappingsByRange retrieves offset mappings within a specific range
+func (s *SQLOffsetStorage) GetOffsetMappingsByRange(partitionKey string, startOffset, endOffset int64) ([]OffsetEntry, error) {
+ // TODO: Use _index column for efficient range queries
+ query := `
+ SELECT kafka_offset, smq_timestamp, message_size
+ FROM offset_mappings
+ WHERE partition_key = ? AND kafka_offset >= ? AND kafka_offset <= ?
+ ORDER BY kafka_offset ASC
+ `
+
+ rows, err := s.db.Query(query, partitionKey, startOffset, endOffset)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query offset range: %w", err)
+ }
+ defer rows.Close()
+
+ var entries []OffsetEntry
+ for rows.Next() {
+ var entry OffsetEntry
+ err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize)
+ if err != nil {
+ return nil, fmt.Errorf("failed to scan offset entry: %w", err)
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, nil
+}
+
+// GetPartitionStats returns statistics about a partition's offset usage
+func (s *SQLOffsetStorage) GetPartitionStats(partitionKey string) (*PartitionStats, error) {
+ query := `
+ SELECT
+ COUNT(*) as record_count,
+ MIN(kafka_offset) as earliest_offset,
+ MAX(kafka_offset) as latest_offset,
+ SUM(message_size) as total_size,
+ MIN(created_at) as first_record_time,
+ MAX(created_at) as last_record_time
+ FROM offset_mappings
+ WHERE partition_key = ?
+ `
+
+ var stats PartitionStats
+ var earliestOffset, latestOffset sql.NullInt64
+ var totalSize sql.NullInt64
+ var firstRecordTime, lastRecordTime sql.NullInt64
+
+ err := s.db.QueryRow(query, partitionKey).Scan(
+ &stats.RecordCount,
+ &earliestOffset,
+ &latestOffset,
+ &totalSize,
+ &firstRecordTime,
+ &lastRecordTime,
+ )
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to get partition stats: %w", err)
+ }
+
+ stats.PartitionKey = partitionKey
+
+ if earliestOffset.Valid {
+ stats.EarliestOffset = earliestOffset.Int64
+ } else {
+ stats.EarliestOffset = -1
+ }
+
+ if latestOffset.Valid {
+ stats.LatestOffset = latestOffset.Int64
+ stats.HighWaterMark = latestOffset.Int64 + 1
+ } else {
+ stats.LatestOffset = -1
+ stats.HighWaterMark = 0
+ }
+
+ if firstRecordTime.Valid {
+ stats.FirstRecordTime = firstRecordTime.Int64
+ }
+
+ if lastRecordTime.Valid {
+ stats.LastRecordTime = lastRecordTime.Int64
+ }
+
+ if totalSize.Valid {
+ stats.TotalSize = totalSize.Int64
+ }
+
+ return &stats, nil
+}
+
+// CleanupOldMappings removes offset mappings older than the specified time
+func (s *SQLOffsetStorage) CleanupOldMappings(olderThanNs int64) error {
+ // TODO: Add configurable cleanup policies
+ // ASSUMPTION: Simple time-based cleanup, could be enhanced with retention policies
+ query := `
+ DELETE FROM offset_mappings
+ WHERE created_at < ?
+ `
+
+ result, err := s.db.Exec(query, olderThanNs)
+ if err != nil {
+ return fmt.Errorf("failed to cleanup old mappings: %w", err)
+ }
+
+ rowsAffected, _ := result.RowsAffected()
+ if rowsAffected > 0 {
+ // Log cleanup activity
+ fmt.Printf("Cleaned up %d old offset mappings\n", rowsAffected)
+ }
+
+ return nil
+}
+
+// Close closes the database connection
+func (s *SQLOffsetStorage) Close() error {
+ if s.db != nil {
+ return s.db.Close()
+ }
+ return nil
+}
+
+// PartitionStats provides statistics about a partition's offset usage
+type PartitionStats struct {
+ PartitionKey string
+ RecordCount int64
+ EarliestOffset int64
+ LatestOffset int64
+ HighWaterMark int64
+ TotalSize int64
+ FirstRecordTime int64
+ LastRecordTime int64
+}
+
+// GetAllPartitions returns a list of all partitions with offset data
+func (s *SQLOffsetStorage) GetAllPartitions() ([]string, error) {
+ query := `
+ SELECT DISTINCT partition_key
+ FROM offset_mappings
+ ORDER BY partition_key
+ `
+
+ rows, err := s.db.Query(query)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get all partitions: %w", err)
+ }
+ defer rows.Close()
+
+ var partitions []string
+ for rows.Next() {
+ var partitionKey string
+ if err := rows.Scan(&partitionKey); err != nil {
+ return nil, fmt.Errorf("failed to scan partition key: %w", err)
+ }
+ partitions = append(partitions, partitionKey)
+ }
+
+ return partitions, nil
+}
+
+// Vacuum performs database maintenance operations
+func (s *SQLOffsetStorage) Vacuum() error {
+ // TODO: Add database-specific optimization commands
+ // ASSUMPTION: SQLite VACUUM command, may need adaptation for other databases
+ _, err := s.db.Exec("VACUUM")
+ if err != nil {
+ return fmt.Errorf("failed to vacuum database: %w", err)
+ }
+
+ return nil
+}
diff --git a/weed/mq/offset/sql_storage_test.go b/weed/mq/offset/sql_storage_test.go
new file mode 100644
index 000000000..661f317de
--- /dev/null
+++ b/weed/mq/offset/sql_storage_test.go
@@ -0,0 +1,516 @@
+package offset
+
+import (
+ "database/sql"
+ "os"
+ "testing"
+ "time"
+
+ _ "github.com/mattn/go-sqlite3" // SQLite driver
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func createTestDB(t *testing.T) *sql.DB {
+ // Create temporary database file
+ tmpFile, err := os.CreateTemp("", "offset_test_*.db")
+ if err != nil {
+ t.Fatalf("Failed to create temp database file: %v", err)
+ }
+ tmpFile.Close()
+
+ // Clean up the file when test completes
+ t.Cleanup(func() {
+ os.Remove(tmpFile.Name())
+ })
+
+ db, err := sql.Open("sqlite3", tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to open database: %v", err)
+ }
+
+ t.Cleanup(func() {
+ db.Close()
+ })
+
+ return db
+}
+
+func createTestPartitionForSQL() *schema_pb.Partition {
+ return &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+}
+
+func TestSQLOffsetStorage_InitializeSchema(t *testing.T) {
+ db := createTestDB(t)
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ // Verify tables were created
+ tables := []string{
+ "partition_offset_checkpoints",
+ "offset_mappings",
+ }
+
+ for _, table := range tables {
+ var count int
+ err := db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?", table).Scan(&count)
+ if err != nil {
+ t.Fatalf("Failed to check table %s: %v", table, err)
+ }
+
+ if count != 1 {
+ t.Errorf("Table %s was not created", table)
+ }
+ }
+}
+
+func TestSQLOffsetStorage_SaveLoadCheckpoint(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+
+ // Test saving checkpoint
+ err = storage.SaveCheckpoint("test-namespace", "test-topic", partition, 100)
+ if err != nil {
+ t.Fatalf("Failed to save checkpoint: %v", err)
+ }
+
+ // Test loading checkpoint
+ checkpoint, err := storage.LoadCheckpoint("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to load checkpoint: %v", err)
+ }
+
+ if checkpoint != 100 {
+ t.Errorf("Expected checkpoint 100, got %d", checkpoint)
+ }
+
+ // Test updating checkpoint
+ err = storage.SaveCheckpoint("test-namespace", "test-topic", partition, 200)
+ if err != nil {
+ t.Fatalf("Failed to update checkpoint: %v", err)
+ }
+
+ checkpoint, err = storage.LoadCheckpoint("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to load updated checkpoint: %v", err)
+ }
+
+ if checkpoint != 200 {
+ t.Errorf("Expected updated checkpoint 200, got %d", checkpoint)
+ }
+}
+
+func TestSQLOffsetStorage_LoadCheckpointNotFound(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+
+ // Test loading non-existent checkpoint
+ _, err = storage.LoadCheckpoint("test-namespace", "test-topic", partition)
+ if err == nil {
+ t.Error("Expected error for non-existent checkpoint")
+ }
+}
+
+func TestSQLOffsetStorage_SaveLoadOffsetMappings(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Save multiple offset mappings
+ mappings := []struct {
+ offset int64
+ timestamp int64
+ size int32
+ }{
+ {0, 1000, 100},
+ {1, 2000, 150},
+ {2, 3000, 200},
+ }
+
+ for _, mapping := range mappings {
+ err := storage.SaveOffsetMapping(partitionKey, mapping.offset, mapping.timestamp, mapping.size)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Load offset mappings
+ entries, err := storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load offset mappings: %v", err)
+ }
+
+ if len(entries) != len(mappings) {
+ t.Errorf("Expected %d entries, got %d", len(mappings), len(entries))
+ }
+
+ // Verify entries are sorted by offset
+ for i, entry := range entries {
+ expected := mappings[i]
+ if entry.KafkaOffset != expected.offset {
+ t.Errorf("Entry %d: expected offset %d, got %d", i, expected.offset, entry.KafkaOffset)
+ }
+ if entry.SMQTimestamp != expected.timestamp {
+ t.Errorf("Entry %d: expected timestamp %d, got %d", i, expected.timestamp, entry.SMQTimestamp)
+ }
+ if entry.MessageSize != expected.size {
+ t.Errorf("Entry %d: expected size %d, got %d", i, expected.size, entry.MessageSize)
+ }
+ }
+}
+
+func TestSQLOffsetStorage_GetHighestOffset(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := TopicPartitionKey("test-namespace", "test-topic", partition)
+
+ // Test empty partition
+ _, err = storage.GetHighestOffset("test-namespace", "test-topic", partition)
+ if err == nil {
+ t.Error("Expected error for empty partition")
+ }
+
+ // Add some offset mappings
+ offsets := []int64{5, 1, 3, 2, 4}
+ for _, offset := range offsets {
+ err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Get highest offset
+ highest, err := storage.GetHighestOffset("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get highest offset: %v", err)
+ }
+
+ if highest != 5 {
+ t.Errorf("Expected highest offset 5, got %d", highest)
+ }
+}
+
+func TestSQLOffsetStorage_GetOffsetMappingsByRange(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Add offset mappings
+ for i := int64(0); i < 10; i++ {
+ err := storage.SaveOffsetMapping(partitionKey, i, i*1000, 100)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Get range of offsets
+ entries, err := storage.GetOffsetMappingsByRange(partitionKey, 3, 7)
+ if err != nil {
+ t.Fatalf("Failed to get offset range: %v", err)
+ }
+
+ expectedCount := 5 // offsets 3, 4, 5, 6, 7
+ if len(entries) != expectedCount {
+ t.Errorf("Expected %d entries, got %d", expectedCount, len(entries))
+ }
+
+ // Verify range
+ for i, entry := range entries {
+ expectedOffset := int64(3 + i)
+ if entry.KafkaOffset != expectedOffset {
+ t.Errorf("Entry %d: expected offset %d, got %d", i, expectedOffset, entry.KafkaOffset)
+ }
+ }
+}
+
+func TestSQLOffsetStorage_GetPartitionStats(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Test empty partition stats
+ stats, err := storage.GetPartitionStats(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to get empty partition stats: %v", err)
+ }
+
+ if stats.RecordCount != 0 {
+ t.Errorf("Expected record count 0, got %d", stats.RecordCount)
+ }
+
+ if stats.EarliestOffset != -1 {
+ t.Errorf("Expected earliest offset -1, got %d", stats.EarliestOffset)
+ }
+
+ // Add some data
+ sizes := []int32{100, 150, 200}
+ for i, size := range sizes {
+ err := storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), size)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Get stats with data
+ stats, err = storage.GetPartitionStats(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to get partition stats: %v", err)
+ }
+
+ if stats.RecordCount != 3 {
+ t.Errorf("Expected record count 3, got %d", stats.RecordCount)
+ }
+
+ if stats.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", stats.EarliestOffset)
+ }
+
+ if stats.LatestOffset != 2 {
+ t.Errorf("Expected latest offset 2, got %d", stats.LatestOffset)
+ }
+
+ if stats.HighWaterMark != 3 {
+ t.Errorf("Expected high water mark 3, got %d", stats.HighWaterMark)
+ }
+
+ expectedTotalSize := int64(100 + 150 + 200)
+ if stats.TotalSize != expectedTotalSize {
+ t.Errorf("Expected total size %d, got %d", expectedTotalSize, stats.TotalSize)
+ }
+}
+
+func TestSQLOffsetStorage_GetAllPartitions(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ // Test empty database
+ partitions, err := storage.GetAllPartitions()
+ if err != nil {
+ t.Fatalf("Failed to get all partitions: %v", err)
+ }
+
+ if len(partitions) != 0 {
+ t.Errorf("Expected 0 partitions, got %d", len(partitions))
+ }
+
+ // Add data for multiple partitions
+ partition1 := createTestPartitionForSQL()
+ partition2 := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 32,
+ RangeStop: 63,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ partitionKey1 := partitionKey(partition1)
+ partitionKey2 := partitionKey(partition2)
+
+ storage.SaveOffsetMapping(partitionKey1, 0, 1000, 100)
+ storage.SaveOffsetMapping(partitionKey2, 0, 2000, 150)
+
+ // Get all partitions
+ partitions, err = storage.GetAllPartitions()
+ if err != nil {
+ t.Fatalf("Failed to get all partitions: %v", err)
+ }
+
+ if len(partitions) != 2 {
+ t.Errorf("Expected 2 partitions, got %d", len(partitions))
+ }
+
+ // Verify partition keys are present
+ partitionMap := make(map[string]bool)
+ for _, p := range partitions {
+ partitionMap[p] = true
+ }
+
+ if !partitionMap[partitionKey1] {
+ t.Errorf("Partition key %s not found", partitionKey1)
+ }
+
+ if !partitionMap[partitionKey2] {
+ t.Errorf("Partition key %s not found", partitionKey2)
+ }
+}
+
+func TestSQLOffsetStorage_CleanupOldMappings(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Add mappings with different timestamps
+ now := time.Now().UnixNano()
+
+ // Add old mapping by directly inserting with old timestamp
+ oldTime := now - (24 * time.Hour).Nanoseconds() // 24 hours ago
+ _, err = db.Exec(`
+ INSERT INTO offset_mappings
+ (partition_key, kafka_offset, smq_timestamp, message_size, created_at)
+ VALUES (?, ?, ?, ?, ?)
+ `, partitionKey, 0, oldTime, 100, oldTime)
+ if err != nil {
+ t.Fatalf("Failed to insert old mapping: %v", err)
+ }
+
+ // Add recent mapping
+ storage.SaveOffsetMapping(partitionKey, 1, now, 150)
+
+ // Verify both mappings exist
+ entries, err := storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load mappings: %v", err)
+ }
+
+ if len(entries) != 2 {
+ t.Errorf("Expected 2 mappings before cleanup, got %d", len(entries))
+ }
+
+ // Cleanup old mappings (older than 12 hours)
+ cutoffTime := now - (12 * time.Hour).Nanoseconds()
+ err = storage.CleanupOldMappings(cutoffTime)
+ if err != nil {
+ t.Fatalf("Failed to cleanup old mappings: %v", err)
+ }
+
+ // Verify only recent mapping remains
+ entries, err = storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load mappings after cleanup: %v", err)
+ }
+
+ if len(entries) != 1 {
+ t.Errorf("Expected 1 mapping after cleanup, got %d", len(entries))
+ }
+
+ if entries[0].KafkaOffset != 1 {
+ t.Errorf("Expected remaining mapping offset 1, got %d", entries[0].KafkaOffset)
+ }
+}
+
+func TestSQLOffsetStorage_Vacuum(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ // Vacuum should not fail on empty database
+ err = storage.Vacuum()
+ if err != nil {
+ t.Fatalf("Failed to vacuum database: %v", err)
+ }
+
+ // Add some data and vacuum again
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+ storage.SaveOffsetMapping(partitionKey, 0, 1000, 100)
+
+ err = storage.Vacuum()
+ if err != nil {
+ t.Fatalf("Failed to vacuum database with data: %v", err)
+ }
+}
+
+func TestSQLOffsetStorage_ConcurrentAccess(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Test concurrent writes
+ const numGoroutines = 10
+ const offsetsPerGoroutine = 10
+
+ done := make(chan bool, numGoroutines)
+
+ for i := 0; i < numGoroutines; i++ {
+ go func(goroutineID int) {
+ defer func() { done <- true }()
+
+ for j := 0; j < offsetsPerGoroutine; j++ {
+ offset := int64(goroutineID*offsetsPerGoroutine + j)
+ err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100)
+ if err != nil {
+ t.Errorf("Failed to save offset mapping %d: %v", offset, err)
+ return
+ }
+ }
+ }(i)
+ }
+
+ // Wait for all goroutines to complete
+ for i := 0; i < numGoroutines; i++ {
+ <-done
+ }
+
+ // Verify all mappings were saved
+ entries, err := storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load mappings: %v", err)
+ }
+
+ expectedCount := numGoroutines * offsetsPerGoroutine
+ if len(entries) != expectedCount {
+ t.Errorf("Expected %d mappings, got %d", expectedCount, len(entries))
+ }
+}
diff --git a/weed/mq/offset/storage.go b/weed/mq/offset/storage.go
new file mode 100644
index 000000000..b3eaddd6b
--- /dev/null
+++ b/weed/mq/offset/storage.go
@@ -0,0 +1,5 @@
+package offset
+
+// Note: OffsetStorage interface is defined in manager.go
+// Production implementations: FilerOffsetStorage (filer_storage.go), SQLOffsetStorage (sql_storage.go)
+// Test implementation: InMemoryOffsetStorage (storage_test.go)
diff --git a/weed/mq/offset/subscriber.go b/weed/mq/offset/subscriber.go
new file mode 100644
index 000000000..d39932aae
--- /dev/null
+++ b/weed/mq/offset/subscriber.go
@@ -0,0 +1,355 @@
+package offset
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// OffsetSubscriber handles offset-based subscription logic
+type OffsetSubscriber struct {
+ mu sync.RWMutex
+ offsetRegistry *PartitionOffsetRegistry
+ subscriptions map[string]*OffsetSubscription
+}
+
+// OffsetSubscription represents an active offset-based subscription
+type OffsetSubscription struct {
+ ID string
+ Namespace string
+ TopicName string
+ Partition *schema_pb.Partition
+ StartOffset int64
+ CurrentOffset int64
+ OffsetType schema_pb.OffsetType
+ IsActive bool
+ offsetRegistry *PartitionOffsetRegistry
+}
+
+// NewOffsetSubscriber creates a new offset-based subscriber
+func NewOffsetSubscriber(offsetRegistry *PartitionOffsetRegistry) *OffsetSubscriber {
+ return &OffsetSubscriber{
+ offsetRegistry: offsetRegistry,
+ subscriptions: make(map[string]*OffsetSubscription),
+ }
+}
+
+// CreateSubscription creates a new offset-based subscription
+func (s *OffsetSubscriber) CreateSubscription(
+ subscriptionID string,
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ offsetType schema_pb.OffsetType,
+ startOffset int64,
+) (*OffsetSubscription, error) {
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Check if subscription already exists
+ if _, exists := s.subscriptions[subscriptionID]; exists {
+ return nil, fmt.Errorf("subscription %s already exists", subscriptionID)
+ }
+
+ // Resolve the actual start offset based on type
+ actualStartOffset, err := s.resolveStartOffset(namespace, topicName, partition, offsetType, startOffset)
+ if err != nil {
+ return nil, fmt.Errorf("failed to resolve start offset: %w", err)
+ }
+
+ subscription := &OffsetSubscription{
+ ID: subscriptionID,
+ Namespace: namespace,
+ TopicName: topicName,
+ Partition: partition,
+ StartOffset: actualStartOffset,
+ CurrentOffset: actualStartOffset,
+ OffsetType: offsetType,
+ IsActive: true,
+ offsetRegistry: s.offsetRegistry,
+ }
+
+ s.subscriptions[subscriptionID] = subscription
+ return subscription, nil
+}
+
+// GetSubscription retrieves an existing subscription
+func (s *OffsetSubscriber) GetSubscription(subscriptionID string) (*OffsetSubscription, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ subscription, exists := s.subscriptions[subscriptionID]
+ if !exists {
+ return nil, fmt.Errorf("subscription %s not found", subscriptionID)
+ }
+
+ return subscription, nil
+}
+
+// CloseSubscription closes and removes a subscription
+func (s *OffsetSubscriber) CloseSubscription(subscriptionID string) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ subscription, exists := s.subscriptions[subscriptionID]
+ if !exists {
+ return fmt.Errorf("subscription %s not found", subscriptionID)
+ }
+
+ subscription.IsActive = false
+ delete(s.subscriptions, subscriptionID)
+ return nil
+}
+
+// resolveStartOffset resolves the actual start offset based on OffsetType
+func (s *OffsetSubscriber) resolveStartOffset(
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ offsetType schema_pb.OffsetType,
+ requestedOffset int64,
+) (int64, error) {
+
+ switch offsetType {
+ case schema_pb.OffsetType_EXACT_OFFSET:
+ // Validate that the requested offset exists
+ return s.validateAndGetOffset(namespace, topicName, partition, requestedOffset)
+
+ case schema_pb.OffsetType_RESET_TO_OFFSET:
+ // Use the requested offset, even if it doesn't exist yet
+ return requestedOffset, nil
+
+ case schema_pb.OffsetType_RESET_TO_EARLIEST:
+ // Start from offset 0
+ return 0, nil
+
+ case schema_pb.OffsetType_RESET_TO_LATEST:
+ // Start from the current high water mark
+ hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+ return hwm, nil
+
+ case schema_pb.OffsetType_RESUME_OR_EARLIEST:
+ // Try to resume from a saved position, fallback to earliest
+ // For now, just use earliest (consumer group position tracking will be added later)
+ return 0, nil
+
+ case schema_pb.OffsetType_RESUME_OR_LATEST:
+ // Try to resume from a saved position, fallback to latest
+ // For now, just use latest
+ hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+ return hwm, nil
+
+ default:
+ return 0, fmt.Errorf("unsupported offset type: %v", offsetType)
+ }
+}
+
+// validateAndGetOffset validates that an offset exists and returns it
+func (s *OffsetSubscriber) validateAndGetOffset(namespace, topicName string, partition *schema_pb.Partition, offset int64) (int64, error) {
+ if offset < 0 {
+ return 0, fmt.Errorf("offset cannot be negative: %d", offset)
+ }
+
+ // Get the current high water mark
+ hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return 0, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ // Check if offset is within valid range
+ if offset >= hwm {
+ return 0, fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm)
+ }
+
+ return offset, nil
+}
+
+// SeekToOffset seeks a subscription to a specific offset
+func (sub *OffsetSubscription) SeekToOffset(offset int64) error {
+ if !sub.IsActive {
+ return fmt.Errorf("subscription is not active")
+ }
+
+ // Validate the offset
+ if offset < 0 {
+ return fmt.Errorf("offset cannot be negative: %d", offset)
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ if offset > hwm {
+ return fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm)
+ }
+
+ sub.CurrentOffset = offset
+ return nil
+}
+
+// GetNextOffset returns the next offset to read
+func (sub *OffsetSubscription) GetNextOffset() int64 {
+ return sub.CurrentOffset
+}
+
+// AdvanceOffset advances the subscription to the next offset
+func (sub *OffsetSubscription) AdvanceOffset() {
+ sub.CurrentOffset++
+}
+
+// GetLag returns the lag between current position and high water mark
+func (sub *OffsetSubscription) GetLag() (int64, error) {
+ if !sub.IsActive {
+ return 0, fmt.Errorf("subscription is not active")
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return 0, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ lag := hwm - sub.CurrentOffset
+ if lag < 0 {
+ lag = 0
+ }
+
+ return lag, nil
+}
+
+// IsAtEnd checks if the subscription has reached the end of available data
+func (sub *OffsetSubscription) IsAtEnd() (bool, error) {
+ if !sub.IsActive {
+ return true, fmt.Errorf("subscription is not active")
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return false, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ return sub.CurrentOffset >= hwm, nil
+}
+
+// OffsetRange represents a range of offsets
+type OffsetRange struct {
+ StartOffset int64
+ EndOffset int64
+ Count int64
+}
+
+// GetOffsetRange returns a range of offsets for batch reading
+func (sub *OffsetSubscription) GetOffsetRange(maxCount int64) (*OffsetRange, error) {
+ if !sub.IsActive {
+ return nil, fmt.Errorf("subscription is not active")
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ startOffset := sub.CurrentOffset
+ endOffset := startOffset + maxCount - 1
+
+ // Don't go beyond high water mark
+ if endOffset >= hwm {
+ endOffset = hwm - 1
+ }
+
+ // If start is already at or beyond HWM, return empty range
+ if startOffset >= hwm {
+ return &OffsetRange{
+ StartOffset: startOffset,
+ EndOffset: startOffset - 1, // Empty range
+ Count: 0,
+ }, nil
+ }
+
+ count := endOffset - startOffset + 1
+ return &OffsetRange{
+ StartOffset: startOffset,
+ EndOffset: endOffset,
+ Count: count,
+ }, nil
+}
+
+// AdvanceOffsetBy advances the subscription by a specific number of offsets
+func (sub *OffsetSubscription) AdvanceOffsetBy(count int64) {
+ sub.CurrentOffset += count
+}
+
+// OffsetSeeker provides utilities for offset-based seeking
+type OffsetSeeker struct {
+ offsetRegistry *PartitionOffsetRegistry
+}
+
+// NewOffsetSeeker creates a new offset seeker
+func NewOffsetSeeker(offsetRegistry *PartitionOffsetRegistry) *OffsetSeeker {
+ return &OffsetSeeker{
+ offsetRegistry: offsetRegistry,
+ }
+}
+
+// SeekToTimestamp finds the offset closest to a given timestamp
+// This bridges offset-based and timestamp-based seeking
+func (seeker *OffsetSeeker) SeekToTimestamp(partition *schema_pb.Partition, timestamp int64) (int64, error) {
+ // TODO: This requires integration with the storage layer to map timestamps to offsets
+ // For now, return an error indicating this feature needs implementation
+ return 0, fmt.Errorf("timestamp-to-offset mapping not implemented yet")
+}
+
+// ValidateOffsetRange validates that an offset range is valid
+func (seeker *OffsetSeeker) ValidateOffsetRange(namespace, topicName string, partition *schema_pb.Partition, startOffset, endOffset int64) error {
+ if startOffset < 0 {
+ return fmt.Errorf("start offset cannot be negative: %d", startOffset)
+ }
+
+ if endOffset < startOffset {
+ return fmt.Errorf("end offset %d cannot be less than start offset %d", endOffset, startOffset)
+ }
+
+ hwm, err := seeker.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ if startOffset >= hwm {
+ return fmt.Errorf("start offset %d is beyond high water mark %d", startOffset, hwm)
+ }
+
+ if endOffset >= hwm {
+ return fmt.Errorf("end offset %d is beyond high water mark %d", endOffset, hwm)
+ }
+
+ return nil
+}
+
+// GetAvailableOffsetRange returns the range of available offsets for a partition
+func (seeker *OffsetSeeker) GetAvailableOffsetRange(namespace, topicName string, partition *schema_pb.Partition) (*OffsetRange, error) {
+ hwm, err := seeker.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ if hwm == 0 {
+ // No data available
+ return &OffsetRange{
+ StartOffset: 0,
+ EndOffset: -1,
+ Count: 0,
+ }, nil
+ }
+
+ return &OffsetRange{
+ StartOffset: 0,
+ EndOffset: hwm - 1,
+ Count: hwm,
+ }, nil
+}
diff --git a/weed/mq/offset/subscriber_test.go b/weed/mq/offset/subscriber_test.go
new file mode 100644
index 000000000..1ab97dadc
--- /dev/null
+++ b/weed/mq/offset/subscriber_test.go
@@ -0,0 +1,457 @@
+package offset
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func TestOffsetSubscriber_CreateSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Assign some offsets first
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 10)
+
+ // Test EXACT_OFFSET subscription
+ sub, err := subscriber.CreateSubscription("test-sub-1", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 5)
+ if err != nil {
+ t.Fatalf("Failed to create EXACT_OFFSET subscription: %v", err)
+ }
+
+ if sub.StartOffset != 5 {
+ t.Errorf("Expected start offset 5, got %d", sub.StartOffset)
+ }
+ if sub.CurrentOffset != 5 {
+ t.Errorf("Expected current offset 5, got %d", sub.CurrentOffset)
+ }
+
+ // Test RESET_TO_LATEST subscription
+ sub2, err := subscriber.CreateSubscription("test-sub-2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create RESET_TO_LATEST subscription: %v", err)
+ }
+
+ if sub2.StartOffset != 10 { // Should be at high water mark
+ t.Errorf("Expected start offset 10, got %d", sub2.StartOffset)
+ }
+}
+
+func TestOffsetSubscriber_InvalidSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Assign some offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 5)
+
+ // Test invalid offset (beyond high water mark)
+ _, err := subscriber.CreateSubscription("invalid-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 10)
+ if err == nil {
+ t.Error("Expected error for offset beyond high water mark")
+ }
+
+ // Test negative offset
+ _, err = subscriber.CreateSubscription("invalid-sub-2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, -1)
+ if err == nil {
+ t.Error("Expected error for negative offset")
+ }
+}
+
+func TestOffsetSubscriber_DuplicateSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Create first subscription
+ _, err := subscriber.CreateSubscription("duplicate-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create first subscription: %v", err)
+ }
+
+ // Try to create duplicate
+ _, err = subscriber.CreateSubscription("duplicate-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ if err == nil {
+ t.Error("Expected error for duplicate subscription ID")
+ }
+}
+
+func TestOffsetSubscription_SeekToOffset(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Assign offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 20)
+
+ // Create subscription
+ sub, err := subscriber.CreateSubscription("seek-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Test valid seek
+ err = sub.SeekToOffset(10)
+ if err != nil {
+ t.Fatalf("Failed to seek to offset 10: %v", err)
+ }
+
+ if sub.CurrentOffset != 10 {
+ t.Errorf("Expected current offset 10, got %d", sub.CurrentOffset)
+ }
+
+ // Test invalid seek (beyond high water mark)
+ err = sub.SeekToOffset(25)
+ if err == nil {
+ t.Error("Expected error for seek beyond high water mark")
+ }
+
+ // Test negative seek
+ err = sub.SeekToOffset(-1)
+ if err == nil {
+ t.Error("Expected error for negative seek offset")
+ }
+}
+
+func TestOffsetSubscription_AdvanceOffset(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Create subscription
+ sub, err := subscriber.CreateSubscription("advance-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Test single advance
+ initialOffset := sub.GetNextOffset()
+ sub.AdvanceOffset()
+
+ if sub.GetNextOffset() != initialOffset+1 {
+ t.Errorf("Expected offset %d, got %d", initialOffset+1, sub.GetNextOffset())
+ }
+
+ // Test batch advance
+ sub.AdvanceOffsetBy(5)
+
+ if sub.GetNextOffset() != initialOffset+6 {
+ t.Errorf("Expected offset %d, got %d", initialOffset+6, sub.GetNextOffset())
+ }
+}
+
+func TestOffsetSubscription_GetLag(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Assign offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 15)
+
+ // Create subscription at offset 5
+ sub, err := subscriber.CreateSubscription("lag-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 5)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Check initial lag
+ lag, err := sub.GetLag()
+ if err != nil {
+ t.Fatalf("Failed to get lag: %v", err)
+ }
+
+ expectedLag := int64(15 - 5) // hwm - current
+ if lag != expectedLag {
+ t.Errorf("Expected lag %d, got %d", expectedLag, lag)
+ }
+
+ // Advance and check lag again
+ sub.AdvanceOffsetBy(3)
+
+ lag, err = sub.GetLag()
+ if err != nil {
+ t.Fatalf("Failed to get lag after advance: %v", err)
+ }
+
+ expectedLag = int64(15 - 8) // hwm - current
+ if lag != expectedLag {
+ t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
+ }
+}
+
+func TestOffsetSubscription_IsAtEnd(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Assign offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 10)
+
+ // Create subscription at end
+ sub, err := subscriber.CreateSubscription("end-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Should be at end
+ atEnd, err := sub.IsAtEnd()
+ if err != nil {
+ t.Fatalf("Failed to check if at end: %v", err)
+ }
+
+ if !atEnd {
+ t.Error("Expected subscription to be at end")
+ }
+
+ // Seek to middle and check again
+ sub.SeekToOffset(5)
+
+ atEnd, err = sub.IsAtEnd()
+ if err != nil {
+ t.Fatalf("Failed to check if at end after seek: %v", err)
+ }
+
+ if atEnd {
+ t.Error("Expected subscription not to be at end after seek")
+ }
+}
+
+func TestOffsetSubscription_GetOffsetRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Assign offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 20)
+
+ // Create subscription
+ sub, err := subscriber.CreateSubscription("range-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_EXACT_OFFSET, 5)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Test normal range
+ offsetRange, err := sub.GetOffsetRange(10)
+ if err != nil {
+ t.Fatalf("Failed to get offset range: %v", err)
+ }
+
+ if offsetRange.StartOffset != 5 {
+ t.Errorf("Expected start offset 5, got %d", offsetRange.StartOffset)
+ }
+ if offsetRange.EndOffset != 14 {
+ t.Errorf("Expected end offset 14, got %d", offsetRange.EndOffset)
+ }
+ if offsetRange.Count != 10 {
+ t.Errorf("Expected count 10, got %d", offsetRange.Count)
+ }
+
+ // Test range that exceeds high water mark
+ sub.SeekToOffset(15)
+ offsetRange, err = sub.GetOffsetRange(10)
+ if err != nil {
+ t.Fatalf("Failed to get offset range near end: %v", err)
+ }
+
+ if offsetRange.StartOffset != 15 {
+ t.Errorf("Expected start offset 15, got %d", offsetRange.StartOffset)
+ }
+ if offsetRange.EndOffset != 19 { // Should be capped at hwm-1
+ t.Errorf("Expected end offset 19, got %d", offsetRange.EndOffset)
+ }
+ if offsetRange.Count != 5 {
+ t.Errorf("Expected count 5, got %d", offsetRange.Count)
+ }
+}
+
+func TestOffsetSubscription_EmptyRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Assign offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 10)
+
+ // Create subscription at end
+ sub, err := subscriber.CreateSubscription("empty-range-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Request range when at end
+ offsetRange, err := sub.GetOffsetRange(5)
+ if err != nil {
+ t.Fatalf("Failed to get offset range at end: %v", err)
+ }
+
+ if offsetRange.Count != 0 {
+ t.Errorf("Expected empty range (count 0), got count %d", offsetRange.Count)
+ }
+
+ if offsetRange.StartOffset != 10 {
+ t.Errorf("Expected start offset 10, got %d", offsetRange.StartOffset)
+ }
+
+ if offsetRange.EndOffset != 9 { // Empty range: end < start
+ t.Errorf("Expected end offset 9 (empty range), got %d", offsetRange.EndOffset)
+ }
+}
+
+func TestOffsetSeeker_ValidateOffsetRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ seeker := NewOffsetSeeker(registry)
+ partition := createTestPartition()
+
+ // Assign offsets
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 15)
+
+ // Test valid range
+ err := seeker.ValidateOffsetRange("test-namespace", "test-topic", partition, 5, 10)
+ if err != nil {
+ t.Errorf("Valid range should not return error: %v", err)
+ }
+
+ // Test invalid ranges
+ testCases := []struct {
+ name string
+ startOffset int64
+ endOffset int64
+ expectError bool
+ }{
+ {"negative start", -1, 5, true},
+ {"end before start", 10, 5, true},
+ {"start beyond hwm", 20, 25, true},
+ {"valid range", 0, 14, false},
+ {"single offset", 5, 5, false},
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ err := seeker.ValidateOffsetRange("test-namespace", "test-topic", partition, tc.startOffset, tc.endOffset)
+ if tc.expectError && err == nil {
+ t.Error("Expected error but got none")
+ }
+ if !tc.expectError && err != nil {
+ t.Errorf("Expected no error but got: %v", err)
+ }
+ })
+ }
+}
+
+func TestOffsetSeeker_GetAvailableOffsetRange(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ seeker := NewOffsetSeeker(registry)
+ partition := createTestPartition()
+
+ // Test empty partition
+ offsetRange, err := seeker.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get available range for empty partition: %v", err)
+ }
+
+ if offsetRange.Count != 0 {
+ t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
+ }
+
+ // Assign offsets and test again
+ registry.AssignOffsets("test-namespace", "test-topic", partition, 25)
+
+ offsetRange, err = seeker.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get available range: %v", err)
+ }
+
+ if offsetRange.StartOffset != 0 {
+ t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
+ }
+ if offsetRange.EndOffset != 24 {
+ t.Errorf("Expected end offset 24, got %d", offsetRange.EndOffset)
+ }
+ if offsetRange.Count != 25 {
+ t.Errorf("Expected count 25, got %d", offsetRange.Count)
+ }
+}
+
+func TestOffsetSubscriber_CloseSubscription(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Create subscription
+ sub, err := subscriber.CreateSubscription("close-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ // Verify subscription exists
+ _, err = subscriber.GetSubscription("close-test")
+ if err != nil {
+ t.Fatalf("Subscription should exist: %v", err)
+ }
+
+ // Close subscription
+ err = subscriber.CloseSubscription("close-test")
+ if err != nil {
+ t.Fatalf("Failed to close subscription: %v", err)
+ }
+
+ // Verify subscription is gone
+ _, err = subscriber.GetSubscription("close-test")
+ if err == nil {
+ t.Error("Subscription should not exist after close")
+ }
+
+ // Verify subscription is marked inactive
+ if sub.IsActive {
+ t.Error("Subscription should be marked inactive after close")
+ }
+}
+
+func TestOffsetSubscription_InactiveOperations(t *testing.T) {
+ storage := NewInMemoryOffsetStorage()
+ registry := NewPartitionOffsetRegistry(storage)
+ subscriber := NewOffsetSubscriber(registry)
+ partition := createTestPartition()
+
+ // Create and close subscription
+ sub, err := subscriber.CreateSubscription("inactive-test", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+
+ subscriber.CloseSubscription("inactive-test")
+
+ // Test operations on inactive subscription
+ err = sub.SeekToOffset(5)
+ if err == nil {
+ t.Error("Expected error for seek on inactive subscription")
+ }
+
+ _, err = sub.GetLag()
+ if err == nil {
+ t.Error("Expected error for GetLag on inactive subscription")
+ }
+
+ _, err = sub.IsAtEnd()
+ if err == nil {
+ t.Error("Expected error for IsAtEnd on inactive subscription")
+ }
+
+ _, err = sub.GetOffsetRange(10)
+ if err == nil {
+ t.Error("Expected error for GetOffsetRange on inactive subscription")
+ }
+}