aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer_offset/storage.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer_offset/storage.go')
-rw-r--r--weed/mq/kafka/consumer_offset/storage.go59
1 files changed, 59 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer_offset/storage.go b/weed/mq/kafka/consumer_offset/storage.go
new file mode 100644
index 000000000..d3f999faa
--- /dev/null
+++ b/weed/mq/kafka/consumer_offset/storage.go
@@ -0,0 +1,59 @@
+package consumer_offset
+
+import (
+ "fmt"
+)
+
+// TopicPartition uniquely identifies a topic partition
+type TopicPartition struct {
+ Topic string
+ Partition int32
+}
+
+// OffsetMetadata contains offset and associated metadata
+type OffsetMetadata struct {
+ Offset int64
+ Metadata string
+}
+
+// String returns a string representation of TopicPartition
+func (tp TopicPartition) String() string {
+ return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
+}
+
+// OffsetStorage defines the interface for storing and retrieving consumer offsets
+type OffsetStorage interface {
+ // CommitOffset commits an offset for a consumer group, topic, and partition
+ // offset is the next offset to read (Kafka convention)
+ // metadata is optional application-specific data
+ CommitOffset(group, topic string, partition int32, offset int64, metadata string) error
+
+ // FetchOffset fetches the committed offset for a consumer group, topic, and partition
+ // Returns -1 if no offset has been committed
+ // Returns error if the group or topic doesn't exist (depending on implementation)
+ FetchOffset(group, topic string, partition int32) (int64, string, error)
+
+ // FetchAllOffsets fetches all committed offsets for a consumer group
+ // Returns map of TopicPartition to OffsetMetadata
+ // Returns empty map if group doesn't exist
+ FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error)
+
+ // DeleteGroup deletes all offset data for a consumer group
+ DeleteGroup(group string) error
+
+ // ListGroups returns all consumer group IDs
+ ListGroups() ([]string, error)
+
+ // Close releases any resources held by the storage
+ Close() error
+}
+
+// Common errors
+var (
+ ErrGroupNotFound = fmt.Errorf("consumer group not found")
+ ErrOffsetNotFound = fmt.Errorf("offset not found")
+ ErrInvalidOffset = fmt.Errorf("invalid offset value")
+ ErrInvalidPartition = fmt.Errorf("invalid partition")
+ ErrStorageClosed = fmt.Errorf("storage is closed")
+)
+