diff options
Diffstat (limited to 'weed/mq/client/sub_client/subscriber.go')
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 3e5316b67..9a51ce01e 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -3,9 +3,9 @@ package sub_client import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" "sync" - "time" ) type SubscriberConfiguration struct { @@ -14,16 +14,16 @@ type SubscriberConfiguration struct { ConsumerGroupInstanceId string GrpcDialOption grpc.DialOption MaxPartitionCount int32 // how many partitions to process concurrently - PerPartitionConcurrency int32 // how many messages to process concurrently per partition + SlidingWindowSize int32 // how many messages to process concurrently per partition } type ContentConfiguration struct { - Topic topic.Topic - Filter string - StartTime time.Time - StopTime time.Time + Topic topic.Topic + Filter string + PartitionOffsets []*schema_pb.PartitionOffset } +type OnDataMessageFn func(m *mq_pb.SubscribeMessageResponse_Data) type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() @@ -32,15 +32,17 @@ type TopicSubscriber struct { ContentConfig *ContentConfiguration brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest + OnDataMessageFnnc OnDataMessageFn OnEachMessageFunc OnEachMessageFunc OnCompletionFunc OnCompletionFunc bootstrapBrokers []string waitForMoreMessage bool activeProcessors map[topic.Partition]*ProcessorState activeProcessorsLock sync.Mutex + PartitionOffsetChan chan KeyedOffset } -func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { +func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber { return &TopicSubscriber{ SubscriberConfig: subscriber, ContentConfig: content, @@ -49,6 +51,7 @@ func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfigu bootstrapBrokers: bootstrapBrokers, waitForMoreMessage: true, activeProcessors: make(map[topic.Partition]*ProcessorState), + PartitionOffsetChan: partitionOffsetChan, } } @@ -56,6 +59,10 @@ func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc sub.OnEachMessageFunc = onEachMessageFn } +func (sub *TopicSubscriber) SetOnDataMessageFn(fn OnDataMessageFn) { + sub.OnDataMessageFnnc = fn +} + func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) { sub.OnCompletionFunc = onCompletionFn } |
