diff options
Diffstat (limited to 'weed/mq/kafka/consumer_offset')
| -rw-r--r-- | weed/mq/kafka/consumer_offset/filer_storage.go | 322 | ||||
| -rw-r--r-- | weed/mq/kafka/consumer_offset/filer_storage_test.go | 66 | ||||
| -rw-r--r-- | weed/mq/kafka/consumer_offset/memory_storage.go | 145 | ||||
| -rw-r--r-- | weed/mq/kafka/consumer_offset/memory_storage_test.go | 209 | ||||
| -rw-r--r-- | weed/mq/kafka/consumer_offset/storage.go | 59 |
5 files changed, 801 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer_offset/filer_storage.go b/weed/mq/kafka/consumer_offset/filer_storage.go new file mode 100644 index 000000000..6edc9d5aa --- /dev/null +++ b/weed/mq/kafka/consumer_offset/filer_storage.go @@ -0,0 +1,322 @@ +package consumer_offset + +import ( + "context" + "encoding/json" + "fmt" + "io" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// KafkaConsumerPosition represents a Kafka consumer's position +// Can be either offset-based or timestamp-based +type KafkaConsumerPosition struct { + Type string `json:"type"` // "offset" or "timestamp" + Value int64 `json:"value"` // The actual offset or timestamp value + CommittedAt int64 `json:"committed_at"` // Unix timestamp in milliseconds when committed + Metadata string `json:"metadata"` // Optional: application-specific metadata +} + +// FilerStorage implements OffsetStorage using SeaweedFS filer +// Offsets are stored in JSON format: /kafka/consumer_offsets/{group}/{topic}/{partition}/offset +// Supports both offset and timestamp positioning +type FilerStorage struct { + fca *filer_client.FilerClientAccessor + closed bool +} + +// NewFilerStorage creates a new filer-based offset storage +func NewFilerStorage(fca *filer_client.FilerClientAccessor) *FilerStorage { + return &FilerStorage{ + fca: fca, + closed: false, + } +} + +// CommitOffset commits an offset for a consumer group +// Now stores as JSON to support both offset and timestamp positioning +func (f *FilerStorage) CommitOffset(group, topic string, partition int32, offset int64, metadata string) error { + if f.closed { + return ErrStorageClosed + } + + // Validate inputs + if offset < -1 { + return ErrInvalidOffset + } + if partition < 0 { + return ErrInvalidPartition + } + + offsetPath := f.getOffsetPath(group, topic, partition) + + // Create position structure + position := &KafkaConsumerPosition{ + Type: "offset", + Value: offset, + CommittedAt: time.Now().UnixMilli(), + Metadata: metadata, + } + + // Marshal to JSON + jsonBytes, err := json.Marshal(position) + if err != nil { + return fmt.Errorf("failed to marshal offset to JSON: %w", err) + } + + // Store as single JSON file + if err := f.writeFile(offsetPath, jsonBytes); err != nil { + return fmt.Errorf("failed to write offset: %w", err) + } + + return nil +} + +// FetchOffset fetches the committed offset for a consumer group +func (f *FilerStorage) FetchOffset(group, topic string, partition int32) (int64, string, error) { + if f.closed { + return -1, "", ErrStorageClosed + } + + offsetPath := f.getOffsetPath(group, topic, partition) + + // Read offset file + offsetData, err := f.readFile(offsetPath) + if err != nil { + // File doesn't exist, no offset committed + return -1, "", nil + } + + // Parse JSON format + var position KafkaConsumerPosition + if err := json.Unmarshal(offsetData, &position); err != nil { + return -1, "", fmt.Errorf("failed to parse offset JSON: %w", err) + } + + return position.Value, position.Metadata, nil +} + +// FetchAllOffsets fetches all committed offsets for a consumer group +func (f *FilerStorage) FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error) { + if f.closed { + return nil, ErrStorageClosed + } + + result := make(map[TopicPartition]OffsetMetadata) + groupPath := f.getGroupPath(group) + + // List all topics for this group + topics, err := f.listDirectory(groupPath) + if err != nil { + // Group doesn't exist, return empty map + return result, nil + } + + // For each topic, list all partitions + for _, topicName := range topics { + topicPath := fmt.Sprintf("%s/%s", groupPath, topicName) + partitions, err := f.listDirectory(topicPath) + if err != nil { + continue + } + + // For each partition, read the offset + for _, partitionName := range partitions { + var partition int32 + _, err := fmt.Sscanf(partitionName, "%d", &partition) + if err != nil { + continue + } + + offset, metadata, err := f.FetchOffset(group, topicName, partition) + if err == nil && offset >= 0 { + tp := TopicPartition{Topic: topicName, Partition: partition} + result[tp] = OffsetMetadata{Offset: offset, Metadata: metadata} + } + } + } + + return result, nil +} + +// DeleteGroup deletes all offset data for a consumer group +func (f *FilerStorage) DeleteGroup(group string) error { + if f.closed { + return ErrStorageClosed + } + + groupPath := f.getGroupPath(group) + return f.deleteDirectory(groupPath) +} + +// ListGroups returns all consumer group IDs +func (f *FilerStorage) ListGroups() ([]string, error) { + if f.closed { + return nil, ErrStorageClosed + } + + basePath := "/kafka/consumer_offsets" + return f.listDirectory(basePath) +} + +// Close releases resources +func (f *FilerStorage) Close() error { + f.closed = true + return nil +} + +// Helper methods + +func (f *FilerStorage) getGroupPath(group string) string { + return fmt.Sprintf("/kafka/consumer_offsets/%s", group) +} + +func (f *FilerStorage) getTopicPath(group, topic string) string { + return fmt.Sprintf("%s/%s", f.getGroupPath(group), topic) +} + +func (f *FilerStorage) getPartitionPath(group, topic string, partition int32) string { + return fmt.Sprintf("%s/%d", f.getTopicPath(group, topic), partition) +} + +func (f *FilerStorage) getOffsetPath(group, topic string, partition int32) string { + return fmt.Sprintf("%s/offset", f.getPartitionPath(group, topic, partition)) +} + +func (f *FilerStorage) getMetadataPath(group, topic string, partition int32) string { + return fmt.Sprintf("%s/metadata", f.getPartitionPath(group, topic, partition)) +} + +func (f *FilerStorage) writeFile(path string, data []byte) error { + fullPath := util.FullPath(path) + dir, name := fullPath.DirAndName() + + return f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Create entry + entry := &filer_pb.Entry{ + Name: name, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + FileMode: 0644, + FileSize: uint64(len(data)), + }, + Chunks: []*filer_pb.FileChunk{}, + } + + // For small files, store inline + if len(data) > 0 { + entry.Content = data + } + + // Create or update the entry + return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: entry, + }) + }) +} + +func (f *FilerStorage) readFile(path string) ([]byte, error) { + fullPath := util.FullPath(path) + dir, name := fullPath.DirAndName() + + var data []byte + err := f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Get the entry + resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if err != nil { + return err + } + + entry := resp.Entry + if entry.IsDirectory { + return fmt.Errorf("path is a directory") + } + + // Read inline content if available + if len(entry.Content) > 0 { + data = entry.Content + return nil + } + + // If no chunks, file is empty + if len(entry.Chunks) == 0 { + data = []byte{} + return nil + } + + return fmt.Errorf("chunked files not supported for offset storage") + }) + + return data, err +} + +func (f *FilerStorage) listDirectory(path string) ([]string, error) { + var entries []string + + err := f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: path, + }) + if err != nil { + return err + } + + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + + if resp.Entry.IsDirectory { + entries = append(entries, resp.Entry.Name) + } + } + + return nil + }) + + return entries, err +} + +func (f *FilerStorage) deleteDirectory(path string) error { + fullPath := util.FullPath(path) + dir, name := fullPath.DirAndName() + + return f.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ + Directory: dir, + Name: name, + IsDeleteData: true, + IsRecursive: true, + IgnoreRecursiveError: true, + }) + return err + }) +} + +// normalizePath removes leading/trailing slashes and collapses multiple slashes +func normalizePath(path string) string { + path = strings.Trim(path, "/") + parts := strings.Split(path, "/") + normalized := []string{} + for _, part := range parts { + if part != "" { + normalized = append(normalized, part) + } + } + return "/" + strings.Join(normalized, "/") +} diff --git a/weed/mq/kafka/consumer_offset/filer_storage_test.go b/weed/mq/kafka/consumer_offset/filer_storage_test.go new file mode 100644 index 000000000..6f2f533c5 --- /dev/null +++ b/weed/mq/kafka/consumer_offset/filer_storage_test.go @@ -0,0 +1,66 @@ +package consumer_offset + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// Note: These tests require a running filer instance +// They are marked as integration tests and should be run with: +// go test -tags=integration + +func TestFilerStorageCommitAndFetch(t *testing.T) { + t.Skip("Requires running filer - integration test") + + // This will be implemented once we have test infrastructure + // Test will: + // 1. Create filer storage + // 2. Commit offset + // 3. Fetch offset + // 4. Verify values match +} + +func TestFilerStoragePersistence(t *testing.T) { + t.Skip("Requires running filer - integration test") + + // Test will: + // 1. Commit offset with first storage instance + // 2. Close first instance + // 3. Create new storage instance + // 4. Fetch offset and verify it persisted +} + +func TestFilerStorageMultipleGroups(t *testing.T) { + t.Skip("Requires running filer - integration test") + + // Test will: + // 1. Commit offsets for multiple groups + // 2. Fetch all offsets per group + // 3. Verify isolation between groups +} + +func TestFilerStoragePath(t *testing.T) { + // Test path generation (doesn't require filer) + storage := &FilerStorage{} + + group := "test-group" + topic := "test-topic" + partition := int32(5) + + groupPath := storage.getGroupPath(group) + assert.Equal(t, "/kafka/consumer_offsets/test-group", groupPath) + + topicPath := storage.getTopicPath(group, topic) + assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic", topicPath) + + partitionPath := storage.getPartitionPath(group, topic, partition) + assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5", partitionPath) + + offsetPath := storage.getOffsetPath(group, topic, partition) + assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/offset", offsetPath) + + metadataPath := storage.getMetadataPath(group, topic, partition) + assert.Equal(t, "/kafka/consumer_offsets/test-group/test-topic/5/metadata", metadataPath) +} + diff --git a/weed/mq/kafka/consumer_offset/memory_storage.go b/weed/mq/kafka/consumer_offset/memory_storage.go new file mode 100644 index 000000000..8814107bb --- /dev/null +++ b/weed/mq/kafka/consumer_offset/memory_storage.go @@ -0,0 +1,145 @@ +package consumer_offset + +import ( + "sync" +) + +// MemoryStorage implements OffsetStorage using in-memory maps +// This is suitable for testing and single-node deployments +// Data is lost on restart +type MemoryStorage struct { + mu sync.RWMutex + groups map[string]map[TopicPartition]OffsetMetadata + closed bool +} + +// NewMemoryStorage creates a new in-memory offset storage +func NewMemoryStorage() *MemoryStorage { + return &MemoryStorage{ + groups: make(map[string]map[TopicPartition]OffsetMetadata), + closed: false, + } +} + +// CommitOffset commits an offset for a consumer group +func (m *MemoryStorage) CommitOffset(group, topic string, partition int32, offset int64, metadata string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.closed { + return ErrStorageClosed + } + + // Validate inputs + if offset < -1 { + return ErrInvalidOffset + } + if partition < 0 { + return ErrInvalidPartition + } + + // Create group if it doesn't exist + if m.groups[group] == nil { + m.groups[group] = make(map[TopicPartition]OffsetMetadata) + } + + // Store offset + tp := TopicPartition{Topic: topic, Partition: partition} + m.groups[group][tp] = OffsetMetadata{ + Offset: offset, + Metadata: metadata, + } + + return nil +} + +// FetchOffset fetches the committed offset for a consumer group +func (m *MemoryStorage) FetchOffset(group, topic string, partition int32) (int64, string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.closed { + return -1, "", ErrStorageClosed + } + + groupOffsets, exists := m.groups[group] + if !exists { + // Group doesn't exist, return -1 (no committed offset) + return -1, "", nil + } + + tp := TopicPartition{Topic: topic, Partition: partition} + offsetMeta, exists := groupOffsets[tp] + if !exists { + // No offset committed for this partition + return -1, "", nil + } + + return offsetMeta.Offset, offsetMeta.Metadata, nil +} + +// FetchAllOffsets fetches all committed offsets for a consumer group +func (m *MemoryStorage) FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.closed { + return nil, ErrStorageClosed + } + + groupOffsets, exists := m.groups[group] + if !exists { + // Return empty map for non-existent group + return make(map[TopicPartition]OffsetMetadata), nil + } + + // Return a copy to prevent external modification + result := make(map[TopicPartition]OffsetMetadata, len(groupOffsets)) + for tp, offset := range groupOffsets { + result[tp] = offset + } + + return result, nil +} + +// DeleteGroup deletes all offset data for a consumer group +func (m *MemoryStorage) DeleteGroup(group string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.closed { + return ErrStorageClosed + } + + delete(m.groups, group) + return nil +} + +// ListGroups returns all consumer group IDs +func (m *MemoryStorage) ListGroups() ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.closed { + return nil, ErrStorageClosed + } + + groups := make([]string, 0, len(m.groups)) + for group := range m.groups { + groups = append(groups, group) + } + + return groups, nil +} + +// Close releases resources (no-op for memory storage) +func (m *MemoryStorage) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + m.closed = true + m.groups = nil + + return nil +} + diff --git a/weed/mq/kafka/consumer_offset/memory_storage_test.go b/weed/mq/kafka/consumer_offset/memory_storage_test.go new file mode 100644 index 000000000..eaf849dc5 --- /dev/null +++ b/weed/mq/kafka/consumer_offset/memory_storage_test.go @@ -0,0 +1,209 @@ +package consumer_offset + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemoryStorageCommitAndFetch(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + group := "test-group" + topic := "test-topic" + partition := int32(0) + offset := int64(42) + metadata := "test-metadata" + + // Commit offset + err := storage.CommitOffset(group, topic, partition, offset, metadata) + require.NoError(t, err) + + // Fetch offset + fetchedOffset, fetchedMetadata, err := storage.FetchOffset(group, topic, partition) + require.NoError(t, err) + assert.Equal(t, offset, fetchedOffset) + assert.Equal(t, metadata, fetchedMetadata) +} + +func TestMemoryStorageFetchNonExistent(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + // Fetch offset for non-existent group + offset, metadata, err := storage.FetchOffset("non-existent", "topic", 0) + require.NoError(t, err) + assert.Equal(t, int64(-1), offset) + assert.Equal(t, "", metadata) +} + +func TestMemoryStorageFetchAllOffsets(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + group := "test-group" + + // Commit offsets for multiple partitions + err := storage.CommitOffset(group, "topic1", 0, 10, "meta1") + require.NoError(t, err) + err = storage.CommitOffset(group, "topic1", 1, 20, "meta2") + require.NoError(t, err) + err = storage.CommitOffset(group, "topic2", 0, 30, "meta3") + require.NoError(t, err) + + // Fetch all offsets + offsets, err := storage.FetchAllOffsets(group) + require.NoError(t, err) + assert.Equal(t, 3, len(offsets)) + + // Verify each offset + tp1 := TopicPartition{Topic: "topic1", Partition: 0} + assert.Equal(t, int64(10), offsets[tp1].Offset) + assert.Equal(t, "meta1", offsets[tp1].Metadata) + + tp2 := TopicPartition{Topic: "topic1", Partition: 1} + assert.Equal(t, int64(20), offsets[tp2].Offset) + + tp3 := TopicPartition{Topic: "topic2", Partition: 0} + assert.Equal(t, int64(30), offsets[tp3].Offset) +} + +func TestMemoryStorageDeleteGroup(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + group := "test-group" + + // Commit offset + err := storage.CommitOffset(group, "topic", 0, 100, "") + require.NoError(t, err) + + // Verify offset exists + offset, _, err := storage.FetchOffset(group, "topic", 0) + require.NoError(t, err) + assert.Equal(t, int64(100), offset) + + // Delete group + err = storage.DeleteGroup(group) + require.NoError(t, err) + + // Verify offset is gone + offset, _, err = storage.FetchOffset(group, "topic", 0) + require.NoError(t, err) + assert.Equal(t, int64(-1), offset) +} + +func TestMemoryStorageListGroups(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + // Initially empty + groups, err := storage.ListGroups() + require.NoError(t, err) + assert.Equal(t, 0, len(groups)) + + // Commit offsets for multiple groups + err = storage.CommitOffset("group1", "topic", 0, 10, "") + require.NoError(t, err) + err = storage.CommitOffset("group2", "topic", 0, 20, "") + require.NoError(t, err) + err = storage.CommitOffset("group3", "topic", 0, 30, "") + require.NoError(t, err) + + // List groups + groups, err = storage.ListGroups() + require.NoError(t, err) + assert.Equal(t, 3, len(groups)) + assert.Contains(t, groups, "group1") + assert.Contains(t, groups, "group2") + assert.Contains(t, groups, "group3") +} + +func TestMemoryStorageConcurrency(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + group := "concurrent-group" + topic := "topic" + numGoroutines := 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Launch multiple goroutines to commit offsets concurrently + for i := 0; i < numGoroutines; i++ { + go func(partition int32, offset int64) { + defer wg.Done() + err := storage.CommitOffset(group, topic, partition, offset, "") + assert.NoError(t, err) + }(int32(i%10), int64(i)) + } + + wg.Wait() + + // Verify we can fetch offsets without errors + offsets, err := storage.FetchAllOffsets(group) + require.NoError(t, err) + assert.Greater(t, len(offsets), 0) +} + +func TestMemoryStorageInvalidInputs(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + // Invalid offset (less than -1) + err := storage.CommitOffset("group", "topic", 0, -2, "") + assert.ErrorIs(t, err, ErrInvalidOffset) + + // Invalid partition (negative) + err = storage.CommitOffset("group", "topic", -1, 10, "") + assert.ErrorIs(t, err, ErrInvalidPartition) +} + +func TestMemoryStorageClosedOperations(t *testing.T) { + storage := NewMemoryStorage() + storage.Close() + + // Operations on closed storage should return error + err := storage.CommitOffset("group", "topic", 0, 10, "") + assert.ErrorIs(t, err, ErrStorageClosed) + + _, _, err = storage.FetchOffset("group", "topic", 0) + assert.ErrorIs(t, err, ErrStorageClosed) + + _, err = storage.FetchAllOffsets("group") + assert.ErrorIs(t, err, ErrStorageClosed) + + err = storage.DeleteGroup("group") + assert.ErrorIs(t, err, ErrStorageClosed) + + _, err = storage.ListGroups() + assert.ErrorIs(t, err, ErrStorageClosed) +} + +func TestMemoryStorageOverwrite(t *testing.T) { + storage := NewMemoryStorage() + defer storage.Close() + + group := "test-group" + topic := "topic" + partition := int32(0) + + // Commit initial offset + err := storage.CommitOffset(group, topic, partition, 10, "meta1") + require.NoError(t, err) + + // Overwrite with new offset + err = storage.CommitOffset(group, topic, partition, 20, "meta2") + require.NoError(t, err) + + // Fetch should return latest offset + offset, metadata, err := storage.FetchOffset(group, topic, partition) + require.NoError(t, err) + assert.Equal(t, int64(20), offset) + assert.Equal(t, "meta2", metadata) +} + diff --git a/weed/mq/kafka/consumer_offset/storage.go b/weed/mq/kafka/consumer_offset/storage.go new file mode 100644 index 000000000..d3f999faa --- /dev/null +++ b/weed/mq/kafka/consumer_offset/storage.go @@ -0,0 +1,59 @@ +package consumer_offset + +import ( + "fmt" +) + +// TopicPartition uniquely identifies a topic partition +type TopicPartition struct { + Topic string + Partition int32 +} + +// OffsetMetadata contains offset and associated metadata +type OffsetMetadata struct { + Offset int64 + Metadata string +} + +// String returns a string representation of TopicPartition +func (tp TopicPartition) String() string { + return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition) +} + +// OffsetStorage defines the interface for storing and retrieving consumer offsets +type OffsetStorage interface { + // CommitOffset commits an offset for a consumer group, topic, and partition + // offset is the next offset to read (Kafka convention) + // metadata is optional application-specific data + CommitOffset(group, topic string, partition int32, offset int64, metadata string) error + + // FetchOffset fetches the committed offset for a consumer group, topic, and partition + // Returns -1 if no offset has been committed + // Returns error if the group or topic doesn't exist (depending on implementation) + FetchOffset(group, topic string, partition int32) (int64, string, error) + + // FetchAllOffsets fetches all committed offsets for a consumer group + // Returns map of TopicPartition to OffsetMetadata + // Returns empty map if group doesn't exist + FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error) + + // DeleteGroup deletes all offset data for a consumer group + DeleteGroup(group string) error + + // ListGroups returns all consumer group IDs + ListGroups() ([]string, error) + + // Close releases any resources held by the storage + Close() error +} + +// Common errors +var ( + ErrGroupNotFound = fmt.Errorf("consumer group not found") + ErrOffsetNotFound = fmt.Errorf("offset not found") + ErrInvalidOffset = fmt.Errorf("invalid offset value") + ErrInvalidPartition = fmt.Errorf("invalid partition") + ErrStorageClosed = fmt.Errorf("storage is closed") +) + |
