diff options
Diffstat (limited to 'weed/mq/client/sub_client/subscriber.go')
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 9a51ce01e..ec15d998e 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -1,6 +1,7 @@ package sub_client import ( + "context" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -17,50 +18,50 @@ type SubscriberConfiguration struct { SlidingWindowSize int32 // how many messages to process concurrently per partition } +func (s *SubscriberConfiguration) String() string { + return "ClientId: " + s.ClientId + ", ConsumerGroup: " + s.ConsumerGroup + ", ConsumerGroupInstanceId: " + s.ConsumerGroupInstanceId +} + type ContentConfiguration struct { Topic topic.Topic Filter string PartitionOffsets []*schema_pb.PartitionOffset + OffsetType schema_pb.OffsetType + OffsetTsNs int64 } type OnDataMessageFn func(m *mq_pb.SubscribeMessageResponse_Data) -type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { + ctx context.Context SubscriberConfig *SubscriberConfiguration ContentConfig *ContentConfiguration brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest - OnDataMessageFnnc OnDataMessageFn - OnEachMessageFunc OnEachMessageFunc + OnDataMessageFunc OnDataMessageFn OnCompletionFunc OnCompletionFunc bootstrapBrokers []string - waitForMoreMessage bool activeProcessors map[topic.Partition]*ProcessorState activeProcessorsLock sync.Mutex PartitionOffsetChan chan KeyedOffset } -func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber { +func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber { return &TopicSubscriber{ + ctx: ctx, SubscriberConfig: subscriber, ContentConfig: content, brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024), brokerPartitionAssignmentAckChan: make(chan *mq_pb.SubscriberToSubCoordinatorRequest, 1024), bootstrapBrokers: bootstrapBrokers, - waitForMoreMessage: true, activeProcessors: make(map[topic.Partition]*ProcessorState), PartitionOffsetChan: partitionOffsetChan, } } -func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) { - sub.OnEachMessageFunc = onEachMessageFn -} - func (sub *TopicSubscriber) SetOnDataMessageFn(fn OnDataMessageFn) { - sub.OnDataMessageFnnc = fn + sub.OnDataMessageFunc = fn } func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) { |
