aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset/manager.go')
-rw-r--r--weed/mq/offset/manager.go343
1 files changed, 343 insertions, 0 deletions
diff --git a/weed/mq/offset/manager.go b/weed/mq/offset/manager.go
new file mode 100644
index 000000000..01976a8bf
--- /dev/null
+++ b/weed/mq/offset/manager.go
@@ -0,0 +1,343 @@
+package offset
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// PartitionOffsetManager manages sequential offset assignment for a single partition
+type PartitionOffsetManager struct {
+ mu sync.RWMutex
+ namespace string
+ topicName string
+ partition *schema_pb.Partition
+ nextOffset int64
+
+ // Checkpointing for recovery
+ lastCheckpoint int64
+ checkpointInterval int64
+ storage OffsetStorage
+}
+
+// OffsetStorage interface for persisting offset state
+type OffsetStorage interface {
+ // SaveCheckpoint persists the current offset state for recovery
+ // Takes topic information along with partition to determine the correct storage location
+ SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error
+
+ // LoadCheckpoint retrieves the last saved offset state
+ LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error)
+
+ // GetHighestOffset scans storage to find the highest assigned offset
+ GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error)
+}
+
+// NewPartitionOffsetManager creates a new offset manager for a partition
+func NewPartitionOffsetManager(namespace, topicName string, partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) {
+ manager := &PartitionOffsetManager{
+ namespace: namespace,
+ topicName: topicName,
+ partition: partition,
+ checkpointInterval: 1, // Checkpoint every offset for immediate persistence
+ storage: storage,
+ }
+
+ // Recover offset state
+ if err := manager.recover(); err != nil {
+ return nil, fmt.Errorf("failed to recover offset state: %w", err)
+ }
+
+ return manager, nil
+}
+
+// AssignOffset assigns the next sequential offset
+func (m *PartitionOffsetManager) AssignOffset() int64 {
+ var shouldCheckpoint bool
+ var checkpointOffset int64
+
+ m.mu.Lock()
+ offset := m.nextOffset
+ m.nextOffset++
+
+ // Check if we should checkpoint (but don't do it inside the lock)
+ if offset-m.lastCheckpoint >= m.checkpointInterval {
+ shouldCheckpoint = true
+ checkpointOffset = offset
+ }
+ m.mu.Unlock()
+
+ // Checkpoint outside the lock to avoid deadlock
+ if shouldCheckpoint {
+ m.checkpoint(checkpointOffset)
+ }
+
+ return offset
+}
+
+// AssignOffsets assigns a batch of sequential offsets
+func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) {
+ var shouldCheckpoint bool
+ var checkpointOffset int64
+
+ m.mu.Lock()
+ baseOffset = m.nextOffset
+ lastOffset = m.nextOffset + count - 1
+ m.nextOffset += count
+
+ // Check if we should checkpoint (but don't do it inside the lock)
+ if lastOffset-m.lastCheckpoint >= m.checkpointInterval {
+ shouldCheckpoint = true
+ checkpointOffset = lastOffset
+ }
+ m.mu.Unlock()
+
+ // Checkpoint outside the lock to avoid deadlock
+ if shouldCheckpoint {
+ m.checkpoint(checkpointOffset)
+ }
+
+ return baseOffset, lastOffset
+}
+
+// GetNextOffset returns the next offset that will be assigned
+func (m *PartitionOffsetManager) GetNextOffset() int64 {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ return m.nextOffset
+}
+
+// GetHighWaterMark returns the high water mark (next offset)
+func (m *PartitionOffsetManager) GetHighWaterMark() int64 {
+ return m.GetNextOffset()
+}
+
+// recover restores offset state from storage
+func (m *PartitionOffsetManager) recover() error {
+ var checkpointOffset int64 = -1
+ var highestOffset int64 = -1
+
+ // Try to load checkpoint
+ if offset, err := m.storage.LoadCheckpoint(m.namespace, m.topicName, m.partition); err == nil && offset >= 0 {
+ checkpointOffset = offset
+ }
+
+ // Try to scan storage for highest offset
+ if offset, err := m.storage.GetHighestOffset(m.namespace, m.topicName, m.partition); err == nil && offset >= 0 {
+ highestOffset = offset
+ }
+
+ // Use the higher of checkpoint or storage scan
+ if checkpointOffset >= 0 && highestOffset >= 0 {
+ if highestOffset > checkpointOffset {
+ m.nextOffset = highestOffset + 1
+ m.lastCheckpoint = highestOffset
+ } else {
+ m.nextOffset = checkpointOffset + 1
+ m.lastCheckpoint = checkpointOffset
+ }
+ } else if checkpointOffset >= 0 {
+ m.nextOffset = checkpointOffset + 1
+ m.lastCheckpoint = checkpointOffset
+ } else if highestOffset >= 0 {
+ m.nextOffset = highestOffset + 1
+ m.lastCheckpoint = highestOffset
+ } else {
+ // No data exists, start from 0
+ m.nextOffset = 0
+ m.lastCheckpoint = -1
+ }
+
+ return nil
+}
+
+// checkpoint saves the current offset state
+func (m *PartitionOffsetManager) checkpoint(offset int64) {
+ if err := m.storage.SaveCheckpoint(m.namespace, m.topicName, m.partition, offset); err != nil {
+ // Log error but don't fail - checkpointing is for optimization
+ fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err)
+ return
+ }
+
+ m.mu.Lock()
+ m.lastCheckpoint = offset
+ m.mu.Unlock()
+}
+
+// PartitionOffsetRegistry manages offset managers for multiple partitions
+type PartitionOffsetRegistry struct {
+ mu sync.RWMutex
+ managers map[string]*PartitionOffsetManager
+ storage OffsetStorage
+}
+
+// NewPartitionOffsetRegistry creates a new registry
+func NewPartitionOffsetRegistry(storage OffsetStorage) *PartitionOffsetRegistry {
+ return &PartitionOffsetRegistry{
+ managers: make(map[string]*PartitionOffsetManager),
+ storage: storage,
+ }
+}
+
+// GetManager returns the offset manager for a partition, creating it if needed
+func (r *PartitionOffsetRegistry) GetManager(namespace, topicName string, partition *schema_pb.Partition) (*PartitionOffsetManager, error) {
+ // CRITICAL FIX: Use TopicPartitionKey to ensure each topic has its own offset manager
+ key := TopicPartitionKey(namespace, topicName, partition)
+
+ r.mu.RLock()
+ manager, exists := r.managers[key]
+ r.mu.RUnlock()
+
+ if exists {
+ return manager, nil
+ }
+
+ // Create new manager
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ // Double-check after acquiring write lock
+ if manager, exists := r.managers[key]; exists {
+ return manager, nil
+ }
+
+ manager, err := NewPartitionOffsetManager(namespace, topicName, partition, r.storage)
+ if err != nil {
+ return nil, err
+ }
+
+ r.managers[key] = manager
+ return manager, nil
+}
+
+// AssignOffset assigns an offset for the given partition
+func (r *PartitionOffsetRegistry) AssignOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ manager, err := r.GetManager(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+
+ assignedOffset := manager.AssignOffset()
+
+ return assignedOffset, nil
+}
+
+// AssignOffsets assigns a batch of offsets for the given partition
+func (r *PartitionOffsetRegistry) AssignOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) (baseOffset, lastOffset int64, err error) {
+ manager, err := r.GetManager(namespace, topicName, partition)
+ if err != nil {
+ return 0, 0, err
+ }
+
+ baseOffset, lastOffset = manager.AssignOffsets(count)
+ return baseOffset, lastOffset, nil
+}
+
+// GetHighWaterMark returns the high water mark for a partition
+func (r *PartitionOffsetRegistry) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ manager, err := r.GetManager(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+
+ return manager.GetHighWaterMark(), nil
+}
+
+// TopicPartitionKey generates a unique key for a topic-partition combination
+// This is the canonical key format used across the offset management system
+func TopicPartitionKey(namespace, topicName string, partition *schema_pb.Partition) string {
+ return fmt.Sprintf("%s/%s/ring:%d:range:%d-%d",
+ namespace, topicName,
+ partition.RingSize, partition.RangeStart, partition.RangeStop)
+}
+
+// PartitionKey generates a unique key for a partition (without topic context)
+// Note: UnixTimeNs is intentionally excluded from the key because it represents
+// partition creation time, not partition identity. Using it would cause offset
+// tracking to reset whenever a partition is recreated or looked up again.
+// DEPRECATED: Use TopicPartitionKey for production code to avoid key collisions
+func PartitionKey(partition *schema_pb.Partition) string {
+ return fmt.Sprintf("ring:%d:range:%d-%d",
+ partition.RingSize, partition.RangeStart, partition.RangeStop)
+}
+
+// partitionKey is the internal lowercase version for backward compatibility within this package
+func partitionKey(partition *schema_pb.Partition) string {
+ return PartitionKey(partition)
+}
+
+// OffsetAssignment represents an assigned offset with metadata
+type OffsetAssignment struct {
+ Offset int64
+ Timestamp int64
+ Partition *schema_pb.Partition
+}
+
+// BatchOffsetAssignment represents a batch of assigned offsets
+type BatchOffsetAssignment struct {
+ BaseOffset int64
+ LastOffset int64
+ Count int64
+ Timestamp int64
+ Partition *schema_pb.Partition
+}
+
+// AssignmentResult contains the result of offset assignment
+type AssignmentResult struct {
+ Assignment *OffsetAssignment
+ Batch *BatchOffsetAssignment
+ Error error
+}
+
+// OffsetAssigner provides high-level offset assignment operations
+type OffsetAssigner struct {
+ registry *PartitionOffsetRegistry
+}
+
+// NewOffsetAssigner creates a new offset assigner
+func NewOffsetAssigner(storage OffsetStorage) *OffsetAssigner {
+ return &OffsetAssigner{
+ registry: NewPartitionOffsetRegistry(storage),
+ }
+}
+
+// AssignSingleOffset assigns a single offset with timestamp
+func (a *OffsetAssigner) AssignSingleOffset(namespace, topicName string, partition *schema_pb.Partition) *AssignmentResult {
+ offset, err := a.registry.AssignOffset(namespace, topicName, partition)
+ if err != nil {
+ return &AssignmentResult{Error: err}
+ }
+
+ return &AssignmentResult{
+ Assignment: &OffsetAssignment{
+ Offset: offset,
+ Timestamp: time.Now().UnixNano(),
+ Partition: partition,
+ },
+ }
+}
+
+// AssignBatchOffsets assigns a batch of offsets with timestamp
+func (a *OffsetAssigner) AssignBatchOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) *AssignmentResult {
+ baseOffset, lastOffset, err := a.registry.AssignOffsets(namespace, topicName, partition, count)
+ if err != nil {
+ return &AssignmentResult{Error: err}
+ }
+
+ return &AssignmentResult{
+ Batch: &BatchOffsetAssignment{
+ BaseOffset: baseOffset,
+ LastOffset: lastOffset,
+ Count: count,
+ Timestamp: time.Now().UnixNano(),
+ Partition: partition,
+ },
+ }
+}
+
+// GetHighWaterMark returns the high water mark for a partition
+func (a *OffsetAssigner) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
+ return a.registry.GetHighWaterMark(namespace, topicName, partition)
+}