diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub_offset.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub_offset.go | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_offset.go b/weed/mq/broker/broker_grpc_sub_offset.go new file mode 100644 index 000000000..6cb661464 --- /dev/null +++ b/weed/mq/broker/broker_grpc_sub_offset.go @@ -0,0 +1,253 @@ +package broker + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/offset" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" +) + +// SubscribeWithOffset handles subscription requests with offset-based positioning +// TODO: This extends the broker with offset-aware subscription support +// ASSUMPTION: This will eventually be integrated into the main SubscribeMessage method +func (b *MessageQueueBroker) SubscribeWithOffset( + ctx context.Context, + req *mq_pb.SubscribeMessageRequest, + stream mq_pb.SeaweedMessaging_SubscribeMessageServer, + offsetType schema_pb.OffsetType, + startOffset int64, +) error { + + initMessage := req.GetInit() + if initMessage == nil { + return fmt.Errorf("missing init message") + } + + // Extract partition information from the request + t := topic.FromPbTopic(initMessage.Topic) + + // Get partition from the request's partition_offset field + if initMessage.PartitionOffset == nil || initMessage.PartitionOffset.Partition == nil { + return fmt.Errorf("missing partition information in request") + } + + // Use the partition information from the request + p := topic.Partition{ + RingSize: initMessage.PartitionOffset.Partition.RingSize, + RangeStart: initMessage.PartitionOffset.Partition.RangeStart, + RangeStop: initMessage.PartitionOffset.Partition.RangeStop, + UnixTimeNs: initMessage.PartitionOffset.Partition.UnixTimeNs, + } + + // Create offset-based subscription + subscriptionID := fmt.Sprintf("%s-%s-%d", initMessage.ConsumerGroup, initMessage.ConsumerId, startOffset) + subscription, err := b.offsetManager.CreateSubscription(subscriptionID, t, p, offsetType, startOffset) + if err != nil { + return fmt.Errorf("failed to create offset subscription: %w", err) + } + + defer func() { + if closeErr := b.offsetManager.CloseSubscription(subscriptionID); closeErr != nil { + glog.V(0).Infof("Failed to close subscription %s: %v", subscriptionID, closeErr) + } + }() + + // Get local partition for reading + localTopicPartition, err := b.GetOrGenerateLocalPartition(t, p) + if err != nil { + return fmt.Errorf("topic %v partition %v not found: %v", t, p, err) + } + + // Subscribe to messages using offset-based positioning + return b.subscribeWithOffsetSubscription(ctx, localTopicPartition, subscription, stream, initMessage) +} + +// subscribeWithOffsetSubscription handles the actual message consumption with offset tracking +func (b *MessageQueueBroker) subscribeWithOffsetSubscription( + ctx context.Context, + localPartition *topic.LocalPartition, + subscription *offset.OffsetSubscription, + stream mq_pb.SeaweedMessaging_SubscribeMessageServer, + initMessage *mq_pb.SubscribeMessageRequest_InitMessage, +) error { + + clientName := fmt.Sprintf("%s-%s", initMessage.ConsumerGroup, initMessage.ConsumerId) + + // TODO: Implement offset-based message reading + // ASSUMPTION: For now, we'll use the existing subscription mechanism and track offsets separately + // This should be replaced with proper offset-based reading from storage + + // Convert the subscription's current offset to a proper MessagePosition + startPosition, err := b.convertOffsetToMessagePosition(subscription) + if err != nil { + return fmt.Errorf("failed to convert offset to message position: %w", err) + } + + glog.V(0).Infof("[%s] Starting Subscribe for topic %s partition %d-%d at offset %d", + clientName, subscription.TopicName, subscription.Partition.RangeStart, subscription.Partition.RangeStop, subscription.CurrentOffset) + + return localPartition.Subscribe(clientName, + startPosition, + func() bool { + // Check if context is cancelled (client disconnected) + select { + case <-ctx.Done(): + glog.V(0).Infof("[%s] Context cancelled, stopping", clientName) + return false + default: + } + + // Check if subscription is still active and not at end + if !subscription.IsActive { + glog.V(0).Infof("[%s] Subscription not active, stopping", clientName) + return false + } + + atEnd, err := subscription.IsAtEnd() + if err != nil { + glog.V(0).Infof("[%s] Error checking if subscription at end: %v", clientName, err) + return false + } + + if atEnd { + glog.V(2).Infof("[%s] At end of subscription, stopping", clientName) + return false + } + + // Add a small sleep to avoid CPU busy-wait when checking for new data + time.Sleep(10 * time.Millisecond) + return true + }, + func(logEntry *filer_pb.LogEntry) (bool, error) { + // Check if this message matches our offset requirements + currentOffset := subscription.GetNextOffset() + + if logEntry.Offset < currentOffset { + // Skip messages before our current offset + return false, nil + } + + // Send message to client + if err := stream.Send(&mq_pb.SubscribeMessageResponse{ + Message: &mq_pb.SubscribeMessageResponse_Data{ + Data: &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + }, + }, + }); err != nil { + glog.Errorf("Error sending data to %s: %v", clientName, err) + return false, err + } + + // Advance subscription offset + subscription.AdvanceOffset() + + // Check context for cancellation + select { + case <-ctx.Done(): + return true, ctx.Err() + default: + return false, nil + } + }) +} + +// GetSubscriptionInfo returns information about an active subscription +func (b *MessageQueueBroker) GetSubscriptionInfo(subscriptionID string) (map[string]interface{}, error) { + subscription, err := b.offsetManager.GetSubscription(subscriptionID) + if err != nil { + return nil, err + } + + lag, err := subscription.GetLag() + if err != nil { + return nil, err + } + + atEnd, err := subscription.IsAtEnd() + if err != nil { + return nil, err + } + + return map[string]interface{}{ + "subscription_id": subscription.ID, + "start_offset": subscription.StartOffset, + "current_offset": subscription.CurrentOffset, + "offset_type": subscription.OffsetType.String(), + "is_active": subscription.IsActive, + "lag": lag, + "at_end": atEnd, + }, nil +} + +// ListActiveSubscriptions returns information about all active subscriptions +func (b *MessageQueueBroker) ListActiveSubscriptions() ([]map[string]interface{}, error) { + subscriptions, err := b.offsetManager.ListActiveSubscriptions() + if err != nil { + return nil, err + } + + result := make([]map[string]interface{}, len(subscriptions)) + for i, subscription := range subscriptions { + lag, _ := subscription.GetLag() + atEnd, _ := subscription.IsAtEnd() + + result[i] = map[string]interface{}{ + "subscription_id": subscription.ID, + "start_offset": subscription.StartOffset, + "current_offset": subscription.CurrentOffset, + "offset_type": subscription.OffsetType.String(), + "is_active": subscription.IsActive, + "lag": lag, + "at_end": atEnd, + } + } + + return result, nil +} + +// SeekSubscription seeks an existing subscription to a specific offset +func (b *MessageQueueBroker) SeekSubscription(subscriptionID string, offset int64) error { + subscription, err := b.offsetManager.GetSubscription(subscriptionID) + if err != nil { + return err + } + + return subscription.SeekToOffset(offset) +} + +// convertOffsetToMessagePosition converts a subscription's current offset to a MessagePosition for log_buffer +func (b *MessageQueueBroker) convertOffsetToMessagePosition(subscription *offset.OffsetSubscription) (log_buffer.MessagePosition, error) { + currentOffset := subscription.GetNextOffset() + + // Handle special offset cases + switch subscription.OffsetType { + case schema_pb.OffsetType_RESET_TO_EARLIEST: + return log_buffer.NewMessagePosition(1, -3), nil + + case schema_pb.OffsetType_RESET_TO_LATEST: + return log_buffer.NewMessagePosition(time.Now().UnixNano(), -4), nil + + case schema_pb.OffsetType_EXACT_OFFSET: + // Use proper offset-based positioning that provides consistent results + // This uses the same approach as the main subscription handler in broker_grpc_sub.go + return log_buffer.NewMessagePositionFromOffset(currentOffset), nil + + case schema_pb.OffsetType_EXACT_TS_NS: + // For exact timestamps, use the timestamp directly + return log_buffer.NewMessagePosition(currentOffset, -2), nil + + default: + // Default to starting from current time for unknown offset types + return log_buffer.NewMessagePosition(time.Now().UnixNano(), -2), nil + } +} |
