diff options
Diffstat (limited to 'weed/mq/client')
| -rw-r--r-- | weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go | 21 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub_record/subscriber_record.go | 22 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publish.go | 1 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 10 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 118 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 125 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 110 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 46 |
8 files changed, 297 insertions, 156 deletions
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index adcdda04c..902e7ed1b 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -14,9 +14,11 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -25,10 +27,11 @@ func main() { flag.Parse() subscriberConfig := &sub_client.SubscriberConfiguration{ - ClientId: fmt.Sprintf("client-%d", *clientId), ConsumerGroup: "test", ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + MaxPartitionCount: int32(*maxPartitionCount), + PerPartitionConcurrency: int32(*perPartitionConcurrency), } contentConfig := &sub_client.ContentConfiguration{ @@ -37,18 +40,14 @@ func main() { StartTime: time.Unix(1, 1), } - processorConfig := sub_client.ProcessorConfiguration{ - ConcurrentPartitionLimit: 3, - } - brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) + subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig) counter := 0 - subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) { + subscriber.SetEachMessageFunc(func(key, value []byte) error { counter++ println(string(key), "=>", string(value), counter) - return true, nil + return nil }) subscriber.SetCompletionFunc(func() { diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index 53eb4f15b..674c881ba 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -16,9 +16,11 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -51,10 +53,11 @@ func main() { flag.Parse() subscriberConfig := &sub_client.SubscriberConfiguration{ - ClientId: fmt.Sprintf("client-%d", *clientId), ConsumerGroup: "test", ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + MaxPartitionCount: int32(*maxPartitionCount), + PerPartitionConcurrency: int32(*perPartitionConcurrency), } contentConfig := &sub_client.ContentConfiguration{ @@ -63,20 +66,17 @@ func main() { StartTime: time.Unix(1, 1), } - processorConfig := sub_client.ProcessorConfiguration{ - ConcurrentPartitionLimit: 3, - } - brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) + subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig) counter := 0 - subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) { + subscriber.SetEachMessageFunc(func(key, value []byte) error { counter++ record := &schema_pb.RecordValue{} proto.Unmarshal(value, record) fmt.Printf("record: %v\n", record) - return true, nil + time.Sleep(1300 * time.Millisecond) + return nil }) subscriber.SetCompletionFunc(func() { diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index a25620de1..a85eec31f 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -51,6 +51,7 @@ func (p *TopicPublisher) FinishPublish() error { TsNs: time.Now().UnixNano(), Ctrl: &mq_pb.ControlMessage{ IsClose: true, + PublisherName: p.config.PublisherName, }, }) } diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 03377d653..df2270b2c 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -142,11 +142,11 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if err = publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ - Topic: p.config.Topic.ToPbTopic(), - Partition: job.Partition, - AckInterval: 128, - FollowerBrokers: job.FollowerBrokers, - PublisherName: p.config.PublisherName, + Topic: p.config.Topic.ToPbTopic(), + Partition: job.Partition, + AckInterval: 128, + FollowerBroker: job.FollowerBroker, + PublisherName: p.config.PublisherName, }, }, }); err != nil { diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 2f1330b5e..d05ddb960 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -2,12 +2,9 @@ package sub_client import ( "context" - "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "io" - "sync" "time" ) @@ -51,6 +48,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: sub.ContentConfig.Topic.ToPbTopic(), + MaxPartitionCount: sub.SubscriberConfig.MaxPartitionCount, }, }, }); err != nil { @@ -58,6 +56,16 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { return err } + go func() { + for reply := range sub.brokerPartitionAssignmentAckChan { + glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply) + if err := stream.Send(reply); err != nil { + glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err) + return + } + } + }() + // keep receiving messages from the sub coordinator for { resp, err := stream.Recv() @@ -65,11 +73,8 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err) return err } - assignment := resp.GetAssignment() - if assignment != nil { - glog.V(0).Infof("subscriber %s receive assignment: %v", sub.ContentConfig.Topic, assignment) - } - sub.onEachAssignment(assignment) + sub.brokerPartitionAssignmentChan <- resp + glog.V(0).Infof("Received assignment: %+v", resp) } return nil @@ -82,100 +87,3 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { time.Sleep(waitTime) } } - -func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCoordinatorResponse_Assignment) { - if assignment == nil { - return - } - // process each partition, with a concurrency limit - var wg sync.WaitGroup - semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit) - - for _, assigned := range assignment.PartitionAssignments { - wg.Add(1) - semaphore <- struct{}{} - go func(assigned *mq_pb.BrokerPartitionAssignment) { - defer wg.Done() - defer func() { <-semaphore }() - glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) - err := sub.onEachPartition(assigned) - if err != nil { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) - } - }(assigned) - } - - wg.Wait() -} - -func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { - // connect to the partition broker - return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Init{ - Init: &mq_pb.SubscribeMessageRequest_InitMessage{ - ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, - ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, - Topic: sub.ContentConfig.Topic.ToPbTopic(), - PartitionOffset: &mq_pb.PartitionOffset{ - Partition: assigned.Partition, - StartTsNs: sub.alreadyProcessedTsNs, - StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, - }, - Filter: sub.ContentConfig.Filter, - FollowerBrokers: assigned.FollowerBrokers, - }, - }, - }) - - if err != nil { - return fmt.Errorf("create subscribe client: %v", err) - } - - glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) - - if sub.OnCompletionFunc != nil { - defer sub.OnCompletionFunc() - } - defer func() { - subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Ack{ - Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Sequence: 0, - }, - }, - }) - subscribeClient.CloseSend() - }() - - for { - // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) - resp, err := subscribeClient.Recv() - if err != nil { - return fmt.Errorf("subscribe recv: %v", err) - } - if resp.Message == nil { - glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) - continue - } - switch m := resp.Message.(type) { - case *mq_pb.SubscribeMessageResponse_Data: - shouldContinue, processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) - if processErr != nil { - return fmt.Errorf("process error: %v", processErr) - } - sub.alreadyProcessedTsNs = m.Data.TsNs - if !shouldContinue { - return nil - } - case *mq_pb.SubscribeMessageResponse_Ctrl: - // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) - if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { - return io.EOF - } - } - } - - return nil - }) -} diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go new file mode 100644 index 000000000..5dcac4eb3 --- /dev/null +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -0,0 +1,125 @@ +package sub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "io" +) + +func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error { + // connect to the partition broker + return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + + subscribeClient, err := client.SubscribeMessage(context.Background()) + if err != nil { + return fmt.Errorf("create subscribe client: %v", err) + } + + perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency + if perPartitionConcurrency <= 0 { + perPartitionConcurrency = 1 + } + + if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, + ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, + Topic: sub.ContentConfig.Topic.ToPbTopic(), + PartitionOffset: &mq_pb.PartitionOffset{ + Partition: assigned.Partition, + StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, + }, + Filter: sub.ContentConfig.Filter, + FollowerBroker: assigned.FollowerBroker, + Concurrency: perPartitionConcurrency, + }, + }, + }); err != nil { + glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) + } + + glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) + + if sub.OnCompletionFunc != nil { + defer sub.OnCompletionFunc() + } + + type KeyedOffset struct { + Key []byte + Offset int64 + } + + partitionOffsetChan := make(chan KeyedOffset, 1024) + defer func() { + close(partitionOffsetChan) + }() + executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) + + go func() { + for { + select { + case <-stopCh: + subscribeClient.CloseSend() + return + case ack, ok := <-partitionOffsetChan: + if !ok { + subscribeClient.CloseSend() + return + } + subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Ack{ + Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ + Key: ack.Key, + Sequence: ack.Offset, + }, + }, + }) + } + } + }() + + var lastErr error + + for lastErr == nil { + // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + resp, err := subscribeClient.Recv() + if err != nil { + return fmt.Errorf("subscribe recv: %v", err) + } + if resp.Message == nil { + glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + continue + } + switch m := resp.Message.(type) { + case *mq_pb.SubscribeMessageResponse_Data: + if m.Data.Ctrl != nil { + glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.IsClose) + continue + } + executors.Execute(func() { + processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) + if processErr == nil { + partitionOffsetChan <- KeyedOffset{ + Key: m.Data.Key, + Offset: m.Data.TsNs, + } + } else { + lastErr = processErr + } + }) + case *mq_pb.SubscribeMessageResponse_Ctrl: + // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) + if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { + return io.EOF + } + } + } + + return lastErr + }) +} diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index df62ea674..5669bb348 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,11 +1,121 @@ package sub_client +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync" + "time" +) + +type ProcessorState struct { + stopCh chan struct{} +} + // Subscribe subscribes to a topic's specified partitions. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. func (sub *TopicSubscriber) Subscribe() error { + + go sub.startProcessors() + // loop forever sub.doKeepConnectedToSubCoordinator() return nil } + +func (sub *TopicSubscriber) startProcessors() { + // listen to the messages from the sub coordinator + // start one processor per partition + var wg sync.WaitGroup + semaphore := make(chan struct{}, sub.SubscriberConfig.MaxPartitionCount) + + for message := range sub.brokerPartitionAssignmentChan { + if assigned := message.GetAssignment(); assigned != nil { + wg.Add(1) + semaphore <- struct{}{} + + topicPartition := topic.FromPbPartition(assigned.PartitionAssignment.Partition) + + // wait until no covering partition is still in progress + sub.waitUntilNoOverlappingPartitionInFlight(topicPartition) + + // start a processors + stopChan := make(chan struct{}) + sub.activeProcessorsLock.Lock() + sub.activeProcessors[topicPartition] = &ProcessorState{ + stopCh: stopChan, + } + sub.activeProcessorsLock.Unlock() + + go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) { + defer func() { + sub.activeProcessorsLock.Lock() + delete(sub.activeProcessors, topicPartition) + sub.activeProcessorsLock.Unlock() + + <-semaphore + wg.Done() + }() + glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{ + Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{ + AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{ + Partition: assigned.Partition, + }, + }, + } + err := sub.onEachPartition(assigned, stopChan) + if err != nil { + glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) + } else { + glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + } + sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{ + Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{ + AckUnAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage{ + Partition: assigned.Partition, + }, + }, + } + }(assigned.PartitionAssignment, topicPartition) + } + if unAssignment := message.GetUnAssignment(); unAssignment != nil { + topicPartition := topic.FromPbPartition(unAssignment.Partition) + sub.activeProcessorsLock.Lock() + if processor, found := sub.activeProcessors[topicPartition]; found { + close(processor.stopCh) + delete(sub.activeProcessors, topicPartition) + } + sub.activeProcessorsLock.Unlock() + } + } + + wg.Wait() + +} + +func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) { + foundOverlapping := true + for foundOverlapping { + sub.activeProcessorsLock.Lock() + foundOverlapping = false + var overlappedPartition topic.Partition + for partition, _ := range sub.activeProcessors { + if partition.Overlaps(topicPartition) { + if partition.Equals(topicPartition) { + continue + } + foundOverlapping = true + overlappedPartition = partition + break + } + } + sub.activeProcessorsLock.Unlock() + if foundOverlapping { + glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition) + time.Sleep(1 * time.Second) + } + } +} diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 982c3f13b..922593b77 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -4,6 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" + "sync" "time" ) @@ -11,10 +12,9 @@ type SubscriberConfiguration struct { ClientId string ConsumerGroup string ConsumerGroupInstanceId string - GroupMinimumPeers int32 - GroupMaximumPeers int32 - BootstrapServers []string GrpcDialOption grpc.DialOption + MaxPartitionCount int32 // how many partitions to process concurrently + PerPartitionConcurrency int32 // how many messages to process concurrently per partition } type ContentConfiguration struct { @@ -23,33 +23,31 @@ type ContentConfiguration struct { StartTime time.Time } -type ProcessorConfiguration struct { - ConcurrentPartitionLimit int // how many partitions to process concurrently -} - -type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error) +type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { - SubscriberConfig *SubscriberConfiguration - ContentConfig *ContentConfiguration - ProcessorConfig *ProcessorConfiguration - brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment - OnEachMessageFunc OnEachMessageFunc - OnCompletionFunc OnCompletionFunc - bootstrapBrokers []string - waitForMoreMessage bool - alreadyProcessedTsNs int64 + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration + brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse + brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest + OnEachMessageFunc OnEachMessageFunc + OnCompletionFunc OnCompletionFunc + bootstrapBrokers []string + waitForMoreMessage bool + activeProcessors map[topic.Partition]*ProcessorState + activeProcessorsLock sync.Mutex } -func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { +func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { return &TopicSubscriber{ - SubscriberConfig: subscriber, - ContentConfig: content, - ProcessorConfig: &processor, - bootstrapBrokers: bootstrapBrokers, - waitForMoreMessage: true, - alreadyProcessedTsNs: content.StartTime.UnixNano(), + SubscriberConfig: subscriber, + ContentConfig: content, + brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024), + brokerPartitionAssignmentAckChan: make(chan *mq_pb.SubscriberToSubCoordinatorRequest, 1024), + bootstrapBrokers: bootstrapBrokers, + waitForMoreMessage: true, + activeProcessors: make(map[topic.Partition]*ProcessorState), } } |
