aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/subscriber.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset/subscriber.go')
-rw-r--r--weed/mq/offset/subscriber.go355
1 files changed, 355 insertions, 0 deletions
diff --git a/weed/mq/offset/subscriber.go b/weed/mq/offset/subscriber.go
new file mode 100644
index 000000000..d39932aae
--- /dev/null
+++ b/weed/mq/offset/subscriber.go
@@ -0,0 +1,355 @@
+package offset
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// OffsetSubscriber handles offset-based subscription logic
+type OffsetSubscriber struct {
+ mu sync.RWMutex
+ offsetRegistry *PartitionOffsetRegistry
+ subscriptions map[string]*OffsetSubscription
+}
+
+// OffsetSubscription represents an active offset-based subscription
+type OffsetSubscription struct {
+ ID string
+ Namespace string
+ TopicName string
+ Partition *schema_pb.Partition
+ StartOffset int64
+ CurrentOffset int64
+ OffsetType schema_pb.OffsetType
+ IsActive bool
+ offsetRegistry *PartitionOffsetRegistry
+}
+
+// NewOffsetSubscriber creates a new offset-based subscriber
+func NewOffsetSubscriber(offsetRegistry *PartitionOffsetRegistry) *OffsetSubscriber {
+ return &OffsetSubscriber{
+ offsetRegistry: offsetRegistry,
+ subscriptions: make(map[string]*OffsetSubscription),
+ }
+}
+
+// CreateSubscription creates a new offset-based subscription
+func (s *OffsetSubscriber) CreateSubscription(
+ subscriptionID string,
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ offsetType schema_pb.OffsetType,
+ startOffset int64,
+) (*OffsetSubscription, error) {
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Check if subscription already exists
+ if _, exists := s.subscriptions[subscriptionID]; exists {
+ return nil, fmt.Errorf("subscription %s already exists", subscriptionID)
+ }
+
+ // Resolve the actual start offset based on type
+ actualStartOffset, err := s.resolveStartOffset(namespace, topicName, partition, offsetType, startOffset)
+ if err != nil {
+ return nil, fmt.Errorf("failed to resolve start offset: %w", err)
+ }
+
+ subscription := &OffsetSubscription{
+ ID: subscriptionID,
+ Namespace: namespace,
+ TopicName: topicName,
+ Partition: partition,
+ StartOffset: actualStartOffset,
+ CurrentOffset: actualStartOffset,
+ OffsetType: offsetType,
+ IsActive: true,
+ offsetRegistry: s.offsetRegistry,
+ }
+
+ s.subscriptions[subscriptionID] = subscription
+ return subscription, nil
+}
+
+// GetSubscription retrieves an existing subscription
+func (s *OffsetSubscriber) GetSubscription(subscriptionID string) (*OffsetSubscription, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ subscription, exists := s.subscriptions[subscriptionID]
+ if !exists {
+ return nil, fmt.Errorf("subscription %s not found", subscriptionID)
+ }
+
+ return subscription, nil
+}
+
+// CloseSubscription closes and removes a subscription
+func (s *OffsetSubscriber) CloseSubscription(subscriptionID string) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ subscription, exists := s.subscriptions[subscriptionID]
+ if !exists {
+ return fmt.Errorf("subscription %s not found", subscriptionID)
+ }
+
+ subscription.IsActive = false
+ delete(s.subscriptions, subscriptionID)
+ return nil
+}
+
+// resolveStartOffset resolves the actual start offset based on OffsetType
+func (s *OffsetSubscriber) resolveStartOffset(
+ namespace, topicName string,
+ partition *schema_pb.Partition,
+ offsetType schema_pb.OffsetType,
+ requestedOffset int64,
+) (int64, error) {
+
+ switch offsetType {
+ case schema_pb.OffsetType_EXACT_OFFSET:
+ // Validate that the requested offset exists
+ return s.validateAndGetOffset(namespace, topicName, partition, requestedOffset)
+
+ case schema_pb.OffsetType_RESET_TO_OFFSET:
+ // Use the requested offset, even if it doesn't exist yet
+ return requestedOffset, nil
+
+ case schema_pb.OffsetType_RESET_TO_EARLIEST:
+ // Start from offset 0
+ return 0, nil
+
+ case schema_pb.OffsetType_RESET_TO_LATEST:
+ // Start from the current high water mark
+ hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+ return hwm, nil
+
+ case schema_pb.OffsetType_RESUME_OR_EARLIEST:
+ // Try to resume from a saved position, fallback to earliest
+ // For now, just use earliest (consumer group position tracking will be added later)
+ return 0, nil
+
+ case schema_pb.OffsetType_RESUME_OR_LATEST:
+ // Try to resume from a saved position, fallback to latest
+ // For now, just use latest
+ hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return 0, err
+ }
+ return hwm, nil
+
+ default:
+ return 0, fmt.Errorf("unsupported offset type: %v", offsetType)
+ }
+}
+
+// validateAndGetOffset validates that an offset exists and returns it
+func (s *OffsetSubscriber) validateAndGetOffset(namespace, topicName string, partition *schema_pb.Partition, offset int64) (int64, error) {
+ if offset < 0 {
+ return 0, fmt.Errorf("offset cannot be negative: %d", offset)
+ }
+
+ // Get the current high water mark
+ hwm, err := s.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return 0, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ // Check if offset is within valid range
+ if offset >= hwm {
+ return 0, fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm)
+ }
+
+ return offset, nil
+}
+
+// SeekToOffset seeks a subscription to a specific offset
+func (sub *OffsetSubscription) SeekToOffset(offset int64) error {
+ if !sub.IsActive {
+ return fmt.Errorf("subscription is not active")
+ }
+
+ // Validate the offset
+ if offset < 0 {
+ return fmt.Errorf("offset cannot be negative: %d", offset)
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ if offset > hwm {
+ return fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm)
+ }
+
+ sub.CurrentOffset = offset
+ return nil
+}
+
+// GetNextOffset returns the next offset to read
+func (sub *OffsetSubscription) GetNextOffset() int64 {
+ return sub.CurrentOffset
+}
+
+// AdvanceOffset advances the subscription to the next offset
+func (sub *OffsetSubscription) AdvanceOffset() {
+ sub.CurrentOffset++
+}
+
+// GetLag returns the lag between current position and high water mark
+func (sub *OffsetSubscription) GetLag() (int64, error) {
+ if !sub.IsActive {
+ return 0, fmt.Errorf("subscription is not active")
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return 0, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ lag := hwm - sub.CurrentOffset
+ if lag < 0 {
+ lag = 0
+ }
+
+ return lag, nil
+}
+
+// IsAtEnd checks if the subscription has reached the end of available data
+func (sub *OffsetSubscription) IsAtEnd() (bool, error) {
+ if !sub.IsActive {
+ return true, fmt.Errorf("subscription is not active")
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return false, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ return sub.CurrentOffset >= hwm, nil
+}
+
+// OffsetRange represents a range of offsets
+type OffsetRange struct {
+ StartOffset int64
+ EndOffset int64
+ Count int64
+}
+
+// GetOffsetRange returns a range of offsets for batch reading
+func (sub *OffsetSubscription) GetOffsetRange(maxCount int64) (*OffsetRange, error) {
+ if !sub.IsActive {
+ return nil, fmt.Errorf("subscription is not active")
+ }
+
+ hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Namespace, sub.TopicName, sub.Partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ startOffset := sub.CurrentOffset
+ endOffset := startOffset + maxCount - 1
+
+ // Don't go beyond high water mark
+ if endOffset >= hwm {
+ endOffset = hwm - 1
+ }
+
+ // If start is already at or beyond HWM, return empty range
+ if startOffset >= hwm {
+ return &OffsetRange{
+ StartOffset: startOffset,
+ EndOffset: startOffset - 1, // Empty range
+ Count: 0,
+ }, nil
+ }
+
+ count := endOffset - startOffset + 1
+ return &OffsetRange{
+ StartOffset: startOffset,
+ EndOffset: endOffset,
+ Count: count,
+ }, nil
+}
+
+// AdvanceOffsetBy advances the subscription by a specific number of offsets
+func (sub *OffsetSubscription) AdvanceOffsetBy(count int64) {
+ sub.CurrentOffset += count
+}
+
+// OffsetSeeker provides utilities for offset-based seeking
+type OffsetSeeker struct {
+ offsetRegistry *PartitionOffsetRegistry
+}
+
+// NewOffsetSeeker creates a new offset seeker
+func NewOffsetSeeker(offsetRegistry *PartitionOffsetRegistry) *OffsetSeeker {
+ return &OffsetSeeker{
+ offsetRegistry: offsetRegistry,
+ }
+}
+
+// SeekToTimestamp finds the offset closest to a given timestamp
+// This bridges offset-based and timestamp-based seeking
+func (seeker *OffsetSeeker) SeekToTimestamp(partition *schema_pb.Partition, timestamp int64) (int64, error) {
+ // TODO: This requires integration with the storage layer to map timestamps to offsets
+ // For now, return an error indicating this feature needs implementation
+ return 0, fmt.Errorf("timestamp-to-offset mapping not implemented yet")
+}
+
+// ValidateOffsetRange validates that an offset range is valid
+func (seeker *OffsetSeeker) ValidateOffsetRange(namespace, topicName string, partition *schema_pb.Partition, startOffset, endOffset int64) error {
+ if startOffset < 0 {
+ return fmt.Errorf("start offset cannot be negative: %d", startOffset)
+ }
+
+ if endOffset < startOffset {
+ return fmt.Errorf("end offset %d cannot be less than start offset %d", endOffset, startOffset)
+ }
+
+ hwm, err := seeker.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ if startOffset >= hwm {
+ return fmt.Errorf("start offset %d is beyond high water mark %d", startOffset, hwm)
+ }
+
+ if endOffset >= hwm {
+ return fmt.Errorf("end offset %d is beyond high water mark %d", endOffset, hwm)
+ }
+
+ return nil
+}
+
+// GetAvailableOffsetRange returns the range of available offsets for a partition
+func (seeker *OffsetSeeker) GetAvailableOffsetRange(namespace, topicName string, partition *schema_pb.Partition) (*OffsetRange, error) {
+ hwm, err := seeker.offsetRegistry.GetHighWaterMark(namespace, topicName, partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get high water mark: %w", err)
+ }
+
+ if hwm == 0 {
+ // No data available
+ return &OffsetRange{
+ StartOffset: 0,
+ EndOffset: -1,
+ Count: 0,
+ }, nil
+ }
+
+ return &OffsetRange{
+ StartOffset: 0,
+ EndOffset: hwm - 1,
+ Count: hwm,
+ }, nil
+}