aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/filer_storage.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset/filer_storage.go')
-rw-r--r--weed/mq/offset/filer_storage.go100
1 files changed, 100 insertions, 0 deletions
diff --git a/weed/mq/offset/filer_storage.go b/weed/mq/offset/filer_storage.go
new file mode 100644
index 000000000..81be78470
--- /dev/null
+++ b/weed/mq/offset/filer_storage.go
@@ -0,0 +1,100 @@
+package offset
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// FilerOffsetStorage implements OffsetStorage using SeaweedFS filer
+// Stores offset data as files in the same directory structure as SMQ
+// Path: /topics/{namespace}/{topic}/{version}/{partition}/checkpoint.offset
+// The namespace and topic are derived from the actual partition information
+type FilerOffsetStorage struct {
+ filerClientAccessor *filer_client.FilerClientAccessor
+}
+
+// NewFilerOffsetStorageWithAccessor creates a new filer-based offset storage using existing filer client accessor
+func NewFilerOffsetStorageWithAccessor(filerClientAccessor *filer_client.FilerClientAccessor) *FilerOffsetStorage {
+ return &FilerOffsetStorage{
+ filerClientAccessor: filerClientAccessor,
+ }
+}
+
+// SaveCheckpoint saves the checkpoint for a partition
+// Stores as: /topics/{namespace}/{topic}/{version}/{partition}/checkpoint.offset
+func (f *FilerOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error {
+ partitionDir := f.getPartitionDir(namespace, topicName, partition)
+ fileName := "checkpoint.offset"
+
+ // Use SMQ's 8-byte offset format
+ offsetBytes := make([]byte, 8)
+ util.Uint64toBytes(offsetBytes, uint64(offset))
+
+ return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, partitionDir, fileName, offsetBytes)
+ })
+}
+
+// LoadCheckpoint loads the checkpoint for a partition
+func (f *FilerOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ partitionDir := f.getPartitionDir(namespace, topicName, partition)
+ fileName := "checkpoint.offset"
+
+ var offset int64 = -1
+ err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ data, err := filer.ReadInsideFiler(client, partitionDir, fileName)
+ if err != nil {
+ return err
+ }
+ if len(data) != 8 {
+ return fmt.Errorf("invalid checkpoint file format: expected 8 bytes, got %d", len(data))
+ }
+ offset = int64(util.BytesToUint64(data))
+ return nil
+ })
+
+ if err != nil {
+ return -1, err
+ }
+
+ return offset, nil
+}
+
+// GetHighestOffset returns the highest offset stored for a partition
+// For filer storage, this is the same as the checkpoint since we don't store individual records
+func (f *FilerOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ return f.LoadCheckpoint(namespace, topicName, partition)
+}
+
+// Reset clears all data for testing
+func (f *FilerOffsetStorage) Reset() error {
+ // For testing, we could delete all offset files, but this is dangerous
+ // Instead, just return success - individual tests should clean up their own data
+ return nil
+}
+
+// Helper methods
+
+// getPartitionDir returns the directory path for a partition following SMQ convention
+// Format: /topics/{namespace}/{topic}/{version}/{partition}
+func (f *FilerOffsetStorage) getPartitionDir(namespace, topicName string, partition *schema_pb.Partition) string {
+ // Generate version from UnixTimeNs
+ version := time.Unix(0, partition.UnixTimeNs).UTC().Format("v2006-01-02-15-04-05")
+
+ // Generate partition range string
+ partitionRange := fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
+
+ return fmt.Sprintf("%s/%s/%s/%s/%s", filer.TopicsDir, namespace, topicName, version, partitionRange)
+}
+
+// getPartitionKey generates a unique key for a partition
+func (f *FilerOffsetStorage) getPartitionKey(partition *schema_pb.Partition) string {
+ return fmt.Sprintf("ring:%d:range:%d-%d:time:%d",
+ partition.RingSize, partition.RangeStart, partition.RangeStop, partition.UnixTimeNs)
+}