aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go104
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go112
2 files changed, 112 insertions, 104 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 82b072f22..a73161fea 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -2,12 +2,9 @@ package sub_client
import (
"context"
- "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "io"
"time"
)
@@ -85,104 +82,3 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
time.Sleep(waitTime)
}
}
-
-func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
- // connect to the partition broker
- return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
-
- subscribeClient, err := client.SubscribeMessage(context.Background())
- if err != nil {
- return fmt.Errorf("create subscribe client: %v", err)
- }
-
- perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
- if perPartitionConcurrency <= 0 {
- perPartitionConcurrency = 1
- }
-
- if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Init{
- Init: &mq_pb.SubscribeMessageRequest_InitMessage{
- ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
- ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
- Topic: sub.ContentConfig.Topic.ToPbTopic(),
- PartitionOffset: &mq_pb.PartitionOffset{
- Partition: assigned.Partition,
- StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
- },
- Filter: sub.ContentConfig.Filter,
- FollowerBroker: assigned.FollowerBroker,
- Concurrency: perPartitionConcurrency,
- },
- },
- }); err != nil {
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
- }
-
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
-
- if sub.OnCompletionFunc != nil {
- defer sub.OnCompletionFunc()
- }
-
- type KeyedOffset struct {
- Key []byte
- Offset int64
- }
-
- partitionOffsetChan := make(chan KeyedOffset, 1024)
- defer func() {
- close(partitionOffsetChan)
- }()
- executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
-
- go func() {
- for ack := range partitionOffsetChan {
- subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Ack{
- Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Key: ack.Key,
- Sequence: ack.Offset,
- },
- },
- })
- }
- subscribeClient.CloseSend()
- }()
-
- var lastErr error
-
- for lastErr == nil {
- // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
- resp, err := subscribeClient.Recv()
- if err != nil {
- return fmt.Errorf("subscribe recv: %v", err)
- }
- if resp.Message == nil {
- glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
- continue
- }
- switch m := resp.Message.(type) {
- case *mq_pb.SubscribeMessageResponse_Data:
- executors.Execute(func() {
- processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
- if processErr == nil {
- partitionOffsetChan <- KeyedOffset{
- Key: m.Data.Key,
- Offset: m.Data.TsNs,
- }
- } else {
- lastErr = processErr
- }
- })
- case *mq_pb.SubscribeMessageResponse_Ctrl:
- // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
- if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
- return io.EOF
- }
- }
- }
-
- return lastErr
- })
-}
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
new file mode 100644
index 000000000..792376a69
--- /dev/null
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -0,0 +1,112 @@
+package sub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "io"
+)
+
+func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
+ // connect to the partition broker
+ return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+
+ subscribeClient, err := client.SubscribeMessage(context.Background())
+ if err != nil {
+ return fmt.Errorf("create subscribe client: %v", err)
+ }
+
+ perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
+ if perPartitionConcurrency <= 0 {
+ perPartitionConcurrency = 1
+ }
+
+ if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Init{
+ Init: &mq_pb.SubscribeMessageRequest_InitMessage{
+ ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
+ ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
+ Topic: sub.ContentConfig.Topic.ToPbTopic(),
+ PartitionOffset: &mq_pb.PartitionOffset{
+ Partition: assigned.Partition,
+ StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
+ },
+ Filter: sub.ContentConfig.Filter,
+ FollowerBroker: assigned.FollowerBroker,
+ Concurrency: perPartitionConcurrency,
+ },
+ },
+ }); err != nil {
+ glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
+ }
+
+ glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
+
+ if sub.OnCompletionFunc != nil {
+ defer sub.OnCompletionFunc()
+ }
+
+ type KeyedOffset struct {
+ Key []byte
+ Offset int64
+ }
+
+ partitionOffsetChan := make(chan KeyedOffset, 1024)
+ defer func() {
+ close(partitionOffsetChan)
+ }()
+ executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
+
+ go func() {
+ for ack := range partitionOffsetChan {
+ subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Ack{
+ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
+ Key: ack.Key,
+ Sequence: ack.Offset,
+ },
+ },
+ })
+ }
+ subscribeClient.CloseSend()
+ }()
+
+ var lastErr error
+
+ for lastErr == nil {
+ // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ resp, err := subscribeClient.Recv()
+ if err != nil {
+ return fmt.Errorf("subscribe recv: %v", err)
+ }
+ if resp.Message == nil {
+ glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ continue
+ }
+ switch m := resp.Message.(type) {
+ case *mq_pb.SubscribeMessageResponse_Data:
+ executors.Execute(func() {
+ processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
+ if processErr == nil {
+ partitionOffsetChan <- KeyedOffset{
+ Key: m.Data.Key,
+ Offset: m.Data.TsNs,
+ }
+ } else {
+ lastErr = processErr
+ }
+ })
+ case *mq_pb.SubscribeMessageResponse_Ctrl:
+ // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
+ if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
+ return io.EOF
+ }
+ }
+ }
+
+ return lastErr
+ })
+}