aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscriber.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/subscriber.go')
-rw-r--r--weed/mq/client/sub_client/subscriber.go23
1 files changed, 12 insertions, 11 deletions
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 9a51ce01e..ec15d998e 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -1,6 +1,7 @@
package sub_client
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -17,50 +18,50 @@ type SubscriberConfiguration struct {
SlidingWindowSize int32 // how many messages to process concurrently per partition
}
+func (s *SubscriberConfiguration) String() string {
+ return "ClientId: " + s.ClientId + ", ConsumerGroup: " + s.ConsumerGroup + ", ConsumerGroupInstanceId: " + s.ConsumerGroupInstanceId
+}
+
type ContentConfiguration struct {
Topic topic.Topic
Filter string
PartitionOffsets []*schema_pb.PartitionOffset
+ OffsetType schema_pb.OffsetType
+ OffsetTsNs int64
}
type OnDataMessageFn func(m *mq_pb.SubscribeMessageResponse_Data)
-type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func()
type TopicSubscriber struct {
+ ctx context.Context
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse
brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest
- OnDataMessageFnnc OnDataMessageFn
- OnEachMessageFunc OnEachMessageFunc
+ OnDataMessageFunc OnDataMessageFn
OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string
- waitForMoreMessage bool
activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex
PartitionOffsetChan chan KeyedOffset
}
-func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber {
+func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber {
return &TopicSubscriber{
+ ctx: ctx,
SubscriberConfig: subscriber,
ContentConfig: content,
brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024),
brokerPartitionAssignmentAckChan: make(chan *mq_pb.SubscriberToSubCoordinatorRequest, 1024),
bootstrapBrokers: bootstrapBrokers,
- waitForMoreMessage: true,
activeProcessors: make(map[topic.Partition]*ProcessorState),
PartitionOffsetChan: partitionOffsetChan,
}
}
-func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
- sub.OnEachMessageFunc = onEachMessageFn
-}
-
func (sub *TopicSubscriber) SetOnDataMessageFn(fn OnDataMessageFn) {
- sub.OnDataMessageFnnc = fn
+ sub.OnDataMessageFunc = fn
}
func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {