aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer_offset
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer_offset')
-rw-r--r--weed/mq/kafka/consumer_offset/filer_storage.go322
-rw-r--r--weed/mq/kafka/consumer_offset/filer_storage_test.go66
-rw-r--r--weed/mq/kafka/consumer_offset/memory_storage.go145
-rw-r--r--weed/mq/kafka/consumer_offset/memory_storage_test.go209
-rw-r--r--weed/mq/kafka/consumer_offset/storage.go59
5 files changed, 801 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer_offset/filer_storage.go b/weed/mq/kafka/consumer_offset/filer_storage.go
new file mode 100644
index 000000000..6edc9d5aa
--- /dev/null
+++ b/weed/mq/kafka/consumer_offset/filer_storage.go
@@ -0,0 +1,322 @@
+package consumer_offset
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// KafkaConsumerPosition represents a Kafka consumer's position
+// Can be either offset-based or timestamp-based
+type KafkaConsumerPosition struct {
+ Type string `json:"type"` // "offset" or "timestamp"
+ Value int64 `json:"value"` // The actual offset or timestamp value
+ CommittedAt int64 `json:"committed_at"` // Unix timestamp in milliseconds when committed
+ Metadata string `json:"metadata"` // Optional: application-specific metadata
+}
+
+// FilerStorage implements OffsetStorage using SeaweedFS filer
+// Offsets are stored in JSON format: /kafka/consumer_offsets/{group}/{topic}/{partition}/offset
+// Supports both offset and timestamp positioning
+type FilerStorage struct {
+ fca *filer_client.FilerClientAccessor
+ closed bool
+}
+
+// NewFilerStorage creates a new filer-based offset storage
+func NewFilerStorage(fca *filer_client.FilerClientAccessor) *FilerStorage {
+ return &FilerStorage{
+ fca: fca,
+ closed: false,
+ }
+}
+
+// CommitOffset commits an offset for a consumer group
+// Now stores as JSON to support both offset and timestamp positioning
+func (f *FilerStorage) CommitOffset(group, topic string, partition int32, offset int64, metadata string) error {
+ if f.closed {
+ return ErrStorageClosed
+ }
+
+ // Validate inputs
+ if offset < -1 {
+ return ErrInvalidOffset
+ }
+ if partition < 0 {
+ return ErrInvalidPartition
+ }
+
+ offsetPath := f.getOffsetPath(group, topic, partition)
+
+ // Create position structure
+ position := &KafkaConsumerPosition{
+ Type: "offset",
+ Value: offset,
+ CommittedAt: time.Now().UnixMilli(),
+ Metadata: metadata,
+ }
+
+ // Marshal to JSON
+ jsonBytes, err := json.Marshal(position)
+ if err != nil {
+ return fmt.Errorf("failed to marshal offset to JSON: %w", err)
+ }
+
+ // Store as single JSON file
+ if err := f.writeFile(offsetPath, jsonBytes); err != nil {
+ return fmt.Errorf("failed to write offset: %w", err)
+ }
+
+ return nil
+}
+
+// FetchOffset fetches the committed offset for a consumer group
+func (f *FilerStorage) FetchOffset(group, topic string, partition int32) (int64, string, error) {
+ if f.closed {
+ return -1, "", ErrStorageClosed
+ }
+
+ offsetPath := f.getOffsetPath(group, topic, partition)
+
+ // Read offset file
+ offsetData, err := f.readFile(offsetPath)
+ if err != nil {
+ // File doesn't exist, no offset committed
+ return -1, "", nil
+ }
+
+ // Parse JSON format
+ var position KafkaConsumerPosition
+ if err := json.Unmarshal(offsetData, &position); err != nil {
+ return -1, "", fmt.Errorf("failed to parse offset JSON: %w", err)
+ }
+
+ return position.Value, position.Metadata, nil
+}
+
+// FetchAllOffsets fetches all committed offsets for a consumer group
+func (f *FilerStorage) FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error) {
+ if f.closed {
+ return nil, ErrStorageClosed
+ }
+
+ result := make(map[TopicPartition]OffsetMetadata)
+ groupPath := f.getGroupPath(group)
+
+ // List all topics for this group
+ topics, err := f.listDirectory(groupPath)
+ if err != nil {
+ // Group doesn't exist, return empty map
+ return result, nil
+ }
+
+ // For each topic, list all partitions
+ for _, topicName := range topics {
+ topicPath := fmt.Sprintf("%s/%s", groupPath, topicName)
+ partitions, err := f.listDirectory(topicPath)
+ if err != nil {
+ continue
+ }
+
+ // For each partition, read the offset
+ for _, partitionName := range partitions {
+ var partition int32
+ _, err := fmt.Sscanf(partitionName, "%d", &partition)
+ if err != nil {
+ continue
+ }
+
+ offset, metadata, err := f.FetchOffset(group, topicName, partition)
+ if err == nil && offset >= 0 {
+ tp := TopicPartition{Topic: topicName, Partition: partition}
+ result[tp] = OffsetMetadata{Offset: offset, Metadata: metadata}
+ }
+ }
+ }
+
+ return result, nil
+}
+
+// DeleteGroup deletes all offset data for a consumer group
+func (f *FilerStorage) DeleteGroup(group string) error {
+ if f.closed {
+ return ErrStorageClosed
+ }
+
+ groupPath := f.getGroupPath(group)
+ return f.deleteDirectory(groupPath)
+}
+
+// ListGroups returns all consumer group IDs
+func (f *FilerStorage) ListGroups() ([]string, error) {
+ if f.closed {
+ return nil, ErrStorageClosed
+ }
+
+ basePath := "/kafka/consumer_offsets"
+ return f.listDirectory(basePath)
+}
+
+// Close releases resources
+func (f *FilerStorage) Close() error {
+ f.closed = true
+ return nil
+}
+
+// Helper methods
+
+func (f *FilerStorage) getGroupPath(group string) string {
+ return fmt.Sprintf("/kafka/consumer_offsets/%s", group)
+}
+
+func (f *FilerStorage) getTopicPath(group, topic string) string {
+ return fmt.Sprintf("%s/%s", f.getGroupPath(group), topic)
+}
+
+func (f *FilerStorage) getPartitionPath(group, topic string, partition int32) string {
+ return fmt.Sprintf("%s/%d", f.getTopicPath(group, topic), partition)
+}
+
+func (f *FilerStorage) getOffsetPath(group, topic string, partition int32) string {
+ return fmt.Sprintf("%s/offset", f.getPartitionPath(group, topic, partition))
+}
+
+func (f *FilerStorage) getMetadataPath(group, topic string, partition int32) string {
+ return fmt.Sprintf("%s/metadata", f.getPartitionPath(group, topic, partition))
+}
+
+func (f *FilerStorage) writeFile(path string, data []byte) error {
+ fullPath := util.FullPath(path)
+ dir, name := fullPath.DirAndName()
+
+ return f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Create entry
+ entry := &filer_pb.Entry{
+ Name: name,
+ IsDirectory: false,
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ FileMode: 0644,
+ FileSize: uint64(len(data)),
+ },
+ Chunks: []*filer_pb.FileChunk{},
+ }
+
+ // For small files, store inline
+ if len(data) > 0 {
+ entry.Content = data
+ }
+
+ // Create or update the entry
+ return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ })
+}
+
+func (f *FilerStorage) readFile(path string) ([]byte, error) {
+ fullPath := util.FullPath(path)
+ dir, name := fullPath.DirAndName()
+
+ var data []byte
+ err := f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Get the entry
+ resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
+ Directory: dir,
+ Name: name,
+ })
+ if err != nil {
+ return err
+ }
+
+ entry := resp.Entry
+ if entry.IsDirectory {
+ return fmt.Errorf("path is a directory")
+ }
+
+ // Read inline content if available
+ if len(entry.Content) > 0 {
+ data = entry.Content
+ return nil
+ }
+
+ // If no chunks, file is empty
+ if len(entry.Chunks) == 0 {
+ data = []byte{}
+ return nil
+ }
+
+ return fmt.Errorf("chunked files not supported for offset storage")
+ })
+
+ return data, err
+}
+
+func (f *FilerStorage) listDirectory(path string) ([]string, error) {
+ var entries []string
+
+ err := f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
+ Directory: path,
+ })
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return err
+ }
+
+ if resp.Entry.IsDirectory {
+ entries = append(entries, resp.Entry.Name)
+ }
+ }
+
+ return nil
+ })
+
+ return entries, err
+}
+
+func (f *FilerStorage) deleteDirectory(path string) error {
+ fullPath := util.FullPath(path)
+ dir, name := fullPath.DirAndName()
+
+ return f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
+ Directory: dir,
+ Name: name,
+ IsDeleteData: true,
+ IsRecursive: true,
+ IgnoreRecursiveError: true,
+ })
+ return err
+ })
+}
+
+// normalizePath removes leading/trailing slashes and collapses multiple slashes
+func normalizePath(path string) string {
+ path = strings.Trim(path, "/")
+ parts := strings.Split(path, "/")
+ normalized := []string{}
+ for _, part := range parts {
+ if part != "" {
+ normalized = append(normalized, part)
+ }
+ }
+ return "/" + strings.Join(normalized, "/")
+}
diff --git a/weed/mq/kafka/consumer_offset/filer_storage_test.go b/weed/mq/kafka/consumer_offset/filer_storage_test.go
new file mode 100644
index 000000000..6f2f533c5
--- /dev/null
+++ b/weed/mq/kafka/consumer_offset/filer_storage_test.go
@@ -0,0 +1,66 @@
+package consumer_offset
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// Note: These tests require a running filer instance
+// They are marked as integration tests and should be run with:
+// go test -tags=integration
+
+func TestFilerStorageCommitAndFetch(t *testing.T) {
+ t.Skip("Requires running filer - integration test")
+
+ // This will be implemented once we have test infrastructure
+ // Test will:
+ // 1. Create filer storage
+ // 2. Commit offset
+ // 3. Fetch offset
+ // 4. Verify values match
+}
+
+func TestFilerStoragePersistence(t *testing.T) {
+ t.Skip("Requires running filer - integration test")
+
+ // Test will:
+ // 1. Commit offset with first storage instance
+ // 2. Close first instance
+ // 3. Create new storage instance
+ // 4. Fetch offset and verify it persisted
+}
+
+func TestFilerStorageMultipleGroups(t *testing.T) {
+ t.Skip("Requires running filer - integration test")
+
+ // Test will:
+ // 1. Commit offsets for multiple groups
+ // 2. Fetch all offsets per group
+ // 3. Verify isolation between groups
+}
+
+func TestFilerStoragePath(t *testing.T) {
+ // Test path generation (doesn't require filer)
+ storage := &FilerStorage{}
+
+ group := "test-group"
+ topic := "test-topic"
+ partition := int32(5)
+
+ groupPath := storage.getGroupPath(group)
+ assert.Equal(t, "/kafka/consumer_offsets/test-group", groupPath)
+
+ topicPath := storage.getTopicPath(group, topic)
+ assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic", topicPath)
+
+ partitionPath := storage.getPartitionPath(group, topic, partition)
+ assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5", partitionPath)
+
+ offsetPath := storage.getOffsetPath(group, topic, partition)
+ assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/offset", offsetPath)
+
+ metadataPath := storage.getMetadataPath(group, topic, partition)
+ assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/metadata", metadataPath)
+}
+
diff --git a/weed/mq/kafka/consumer_offset/memory_storage.go b/weed/mq/kafka/consumer_offset/memory_storage.go
new file mode 100644
index 000000000..8814107bb
--- /dev/null
+++ b/weed/mq/kafka/consumer_offset/memory_storage.go
@@ -0,0 +1,145 @@
+package consumer_offset
+
+import (
+ "sync"
+)
+
+// MemoryStorage implements OffsetStorage using in-memory maps
+// This is suitable for testing and single-node deployments
+// Data is lost on restart
+type MemoryStorage struct {
+ mu sync.RWMutex
+ groups map[string]map[TopicPartition]OffsetMetadata
+ closed bool
+}
+
+// NewMemoryStorage creates a new in-memory offset storage
+func NewMemoryStorage() *MemoryStorage {
+ return &MemoryStorage{
+ groups: make(map[string]map[TopicPartition]OffsetMetadata),
+ closed: false,
+ }
+}
+
+// CommitOffset commits an offset for a consumer group
+func (m *MemoryStorage) CommitOffset(group, topic string, partition int32, offset int64, metadata string) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.closed {
+ return ErrStorageClosed
+ }
+
+ // Validate inputs
+ if offset < -1 {
+ return ErrInvalidOffset
+ }
+ if partition < 0 {
+ return ErrInvalidPartition
+ }
+
+ // Create group if it doesn't exist
+ if m.groups[group] == nil {
+ m.groups[group] = make(map[TopicPartition]OffsetMetadata)
+ }
+
+ // Store offset
+ tp := TopicPartition{Topic: topic, Partition: partition}
+ m.groups[group][tp] = OffsetMetadata{
+ Offset: offset,
+ Metadata: metadata,
+ }
+
+ return nil
+}
+
+// FetchOffset fetches the committed offset for a consumer group
+func (m *MemoryStorage) FetchOffset(group, topic string, partition int32) (int64, string, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ if m.closed {
+ return -1, "", ErrStorageClosed
+ }
+
+ groupOffsets, exists := m.groups[group]
+ if !exists {
+ // Group doesn't exist, return -1 (no committed offset)
+ return -1, "", nil
+ }
+
+ tp := TopicPartition{Topic: topic, Partition: partition}
+ offsetMeta, exists := groupOffsets[tp]
+ if !exists {
+ // No offset committed for this partition
+ return -1, "", nil
+ }
+
+ return offsetMeta.Offset, offsetMeta.Metadata, nil
+}
+
+// FetchAllOffsets fetches all committed offsets for a consumer group
+func (m *MemoryStorage) FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ if m.closed {
+ return nil, ErrStorageClosed
+ }
+
+ groupOffsets, exists := m.groups[group]
+ if !exists {
+ // Return empty map for non-existent group
+ return make(map[TopicPartition]OffsetMetadata), nil
+ }
+
+ // Return a copy to prevent external modification
+ result := make(map[TopicPartition]OffsetMetadata, len(groupOffsets))
+ for tp, offset := range groupOffsets {
+ result[tp] = offset
+ }
+
+ return result, nil
+}
+
+// DeleteGroup deletes all offset data for a consumer group
+func (m *MemoryStorage) DeleteGroup(group string) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.closed {
+ return ErrStorageClosed
+ }
+
+ delete(m.groups, group)
+ return nil
+}
+
+// ListGroups returns all consumer group IDs
+func (m *MemoryStorage) ListGroups() ([]string, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ if m.closed {
+ return nil, ErrStorageClosed
+ }
+
+ groups := make([]string, 0, len(m.groups))
+ for group := range m.groups {
+ groups = append(groups, group)
+ }
+
+ return groups, nil
+}
+
+// Close releases resources (no-op for memory storage)
+func (m *MemoryStorage) Close() error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ m.closed = true
+ m.groups = nil
+
+ return nil
+}
+
diff --git a/weed/mq/kafka/consumer_offset/memory_storage_test.go b/weed/mq/kafka/consumer_offset/memory_storage_test.go
new file mode 100644
index 000000000..eaf849dc5
--- /dev/null
+++ b/weed/mq/kafka/consumer_offset/memory_storage_test.go
@@ -0,0 +1,209 @@
+package consumer_offset
+
+import (
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestMemoryStorageCommitAndFetch(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ group := "test-group"
+ topic := "test-topic"
+ partition := int32(0)
+ offset := int64(42)
+ metadata := "test-metadata"
+
+ // Commit offset
+ err := storage.CommitOffset(group, topic, partition, offset, metadata)
+ require.NoError(t, err)
+
+ // Fetch offset
+ fetchedOffset, fetchedMetadata, err := storage.FetchOffset(group, topic, partition)
+ require.NoError(t, err)
+ assert.Equal(t, offset, fetchedOffset)
+ assert.Equal(t, metadata, fetchedMetadata)
+}
+
+func TestMemoryStorageFetchNonExistent(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ // Fetch offset for non-existent group
+ offset, metadata, err := storage.FetchOffset("non-existent", "topic", 0)
+ require.NoError(t, err)
+ assert.Equal(t, int64(-1), offset)
+ assert.Equal(t, "", metadata)
+}
+
+func TestMemoryStorageFetchAllOffsets(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ group := "test-group"
+
+ // Commit offsets for multiple partitions
+ err := storage.CommitOffset(group, "topic1", 0, 10, "meta1")
+ require.NoError(t, err)
+ err = storage.CommitOffset(group, "topic1", 1, 20, "meta2")
+ require.NoError(t, err)
+ err = storage.CommitOffset(group, "topic2", 0, 30, "meta3")
+ require.NoError(t, err)
+
+ // Fetch all offsets
+ offsets, err := storage.FetchAllOffsets(group)
+ require.NoError(t, err)
+ assert.Equal(t, 3, len(offsets))
+
+ // Verify each offset
+ tp1 := TopicPartition{Topic: "topic1", Partition: 0}
+ assert.Equal(t, int64(10), offsets[tp1].Offset)
+ assert.Equal(t, "meta1", offsets[tp1].Metadata)
+
+ tp2 := TopicPartition{Topic: "topic1", Partition: 1}
+ assert.Equal(t, int64(20), offsets[tp2].Offset)
+
+ tp3 := TopicPartition{Topic: "topic2", Partition: 0}
+ assert.Equal(t, int64(30), offsets[tp3].Offset)
+}
+
+func TestMemoryStorageDeleteGroup(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ group := "test-group"
+
+ // Commit offset
+ err := storage.CommitOffset(group, "topic", 0, 100, "")
+ require.NoError(t, err)
+
+ // Verify offset exists
+ offset, _, err := storage.FetchOffset(group, "topic", 0)
+ require.NoError(t, err)
+ assert.Equal(t, int64(100), offset)
+
+ // Delete group
+ err = storage.DeleteGroup(group)
+ require.NoError(t, err)
+
+ // Verify offset is gone
+ offset, _, err = storage.FetchOffset(group, "topic", 0)
+ require.NoError(t, err)
+ assert.Equal(t, int64(-1), offset)
+}
+
+func TestMemoryStorageListGroups(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ // Initially empty
+ groups, err := storage.ListGroups()
+ require.NoError(t, err)
+ assert.Equal(t, 0, len(groups))
+
+ // Commit offsets for multiple groups
+ err = storage.CommitOffset("group1", "topic", 0, 10, "")
+ require.NoError(t, err)
+ err = storage.CommitOffset("group2", "topic", 0, 20, "")
+ require.NoError(t, err)
+ err = storage.CommitOffset("group3", "topic", 0, 30, "")
+ require.NoError(t, err)
+
+ // List groups
+ groups, err = storage.ListGroups()
+ require.NoError(t, err)
+ assert.Equal(t, 3, len(groups))
+ assert.Contains(t, groups, "group1")
+ assert.Contains(t, groups, "group2")
+ assert.Contains(t, groups, "group3")
+}
+
+func TestMemoryStorageConcurrency(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ group := "concurrent-group"
+ topic := "topic"
+ numGoroutines := 100
+
+ var wg sync.WaitGroup
+ wg.Add(numGoroutines)
+
+ // Launch multiple goroutines to commit offsets concurrently
+ for i := 0; i < numGoroutines; i++ {
+ go func(partition int32, offset int64) {
+ defer wg.Done()
+ err := storage.CommitOffset(group, topic, partition, offset, "")
+ assert.NoError(t, err)
+ }(int32(i%10), int64(i))
+ }
+
+ wg.Wait()
+
+ // Verify we can fetch offsets without errors
+ offsets, err := storage.FetchAllOffsets(group)
+ require.NoError(t, err)
+ assert.Greater(t, len(offsets), 0)
+}
+
+func TestMemoryStorageInvalidInputs(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ // Invalid offset (less than -1)
+ err := storage.CommitOffset("group", "topic", 0, -2, "")
+ assert.ErrorIs(t, err, ErrInvalidOffset)
+
+ // Invalid partition (negative)
+ err = storage.CommitOffset("group", "topic", -1, 10, "")
+ assert.ErrorIs(t, err, ErrInvalidPartition)
+}
+
+func TestMemoryStorageClosedOperations(t *testing.T) {
+ storage := NewMemoryStorage()
+ storage.Close()
+
+ // Operations on closed storage should return error
+ err := storage.CommitOffset("group", "topic", 0, 10, "")
+ assert.ErrorIs(t, err, ErrStorageClosed)
+
+ _, _, err = storage.FetchOffset("group", "topic", 0)
+ assert.ErrorIs(t, err, ErrStorageClosed)
+
+ _, err = storage.FetchAllOffsets("group")
+ assert.ErrorIs(t, err, ErrStorageClosed)
+
+ err = storage.DeleteGroup("group")
+ assert.ErrorIs(t, err, ErrStorageClosed)
+
+ _, err = storage.ListGroups()
+ assert.ErrorIs(t, err, ErrStorageClosed)
+}
+
+func TestMemoryStorageOverwrite(t *testing.T) {
+ storage := NewMemoryStorage()
+ defer storage.Close()
+
+ group := "test-group"
+ topic := "topic"
+ partition := int32(0)
+
+ // Commit initial offset
+ err := storage.CommitOffset(group, topic, partition, 10, "meta1")
+ require.NoError(t, err)
+
+ // Overwrite with new offset
+ err = storage.CommitOffset(group, topic, partition, 20, "meta2")
+ require.NoError(t, err)
+
+ // Fetch should return latest offset
+ offset, metadata, err := storage.FetchOffset(group, topic, partition)
+ require.NoError(t, err)
+ assert.Equal(t, int64(20), offset)
+ assert.Equal(t, "meta2", metadata)
+}
+
diff --git a/weed/mq/kafka/consumer_offset/storage.go b/weed/mq/kafka/consumer_offset/storage.go
new file mode 100644
index 000000000..d3f999faa
--- /dev/null
+++ b/weed/mq/kafka/consumer_offset/storage.go
@@ -0,0 +1,59 @@
+package consumer_offset
+
+import (
+ "fmt"
+)
+
+// TopicPartition uniquely identifies a topic partition
+type TopicPartition struct {
+ Topic string
+ Partition int32
+}
+
+// OffsetMetadata contains offset and associated metadata
+type OffsetMetadata struct {
+ Offset int64
+ Metadata string
+}
+
+// String returns a string representation of TopicPartition
+func (tp TopicPartition) String() string {
+ return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
+}
+
+// OffsetStorage defines the interface for storing and retrieving consumer offsets
+type OffsetStorage interface {
+ // CommitOffset commits an offset for a consumer group, topic, and partition
+ // offset is the next offset to read (Kafka convention)
+ // metadata is optional application-specific data
+ CommitOffset(group, topic string, partition int32, offset int64, metadata string) error
+
+ // FetchOffset fetches the committed offset for a consumer group, topic, and partition
+ // Returns -1 if no offset has been committed
+ // Returns error if the group or topic doesn't exist (depending on implementation)
+ FetchOffset(group, topic string, partition int32) (int64, string, error)
+
+ // FetchAllOffsets fetches all committed offsets for a consumer group
+ // Returns map of TopicPartition to OffsetMetadata
+ // Returns empty map if group doesn't exist
+ FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error)
+
+ // DeleteGroup deletes all offset data for a consumer group
+ DeleteGroup(group string) error
+
+ // ListGroups returns all consumer group IDs
+ ListGroups() ([]string, error)
+
+ // Close releases any resources held by the storage
+ Close() error
+}
+
+// Common errors
+var (
+ ErrGroupNotFound = fmt.Errorf("consumer group not found")
+ ErrOffsetNotFound = fmt.Errorf("offset not found")
+ ErrInvalidOffset = fmt.Errorf("invalid offset value")
+ ErrInvalidPartition = fmt.Errorf("invalid partition")
+ ErrStorageClosed = fmt.Errorf("storage is closed")
+)
+