diff options
| author | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:50 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:50 -0700 |
| commit | ba67e6ca2998e82eb23abf5c431bdf9a92e966ea (patch) | |
| tree | bcf8c521f4c428f4b2690843a11b9708ce013cc0 /weed/mq/client/sub_client | |
| parent | 9e4f98569898985ed285d8bb8a39b4ea5f095a98 (diff) | |
| download | seaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.tar.xz seaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.zip | |
api for sub
Diffstat (limited to 'weed/mq/client/sub_client')
| -rw-r--r-- | weed/mq/client/sub_client/lookup.go | 8 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 23 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 41 |
3 files changed, 45 insertions, 27 deletions
diff --git a/weed/mq/client/sub_client/lookup.go b/weed/mq/client/sub_client/lookup.go index e836c4864..b6d2a8c53 100644 --- a/weed/mq/client/sub_client/lookup.go +++ b/weed/mq/client/sub_client/lookup.go @@ -10,13 +10,13 @@ import ( func (sub *TopicSubscriber) doLookup(brokerAddress string) error { err := pb.WithBrokerGrpcClient(true, brokerAddress, - sub.grpcDialOption, + sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { lookupResp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{ Topic: &mq_pb.Topic{ - Namespace: sub.namespace, - Name: sub.topic, + Namespace: sub.ContentConfig.Namespace, + Name: sub.ContentConfig.Topic, }, IsForPublish: false, }) @@ -28,7 +28,7 @@ func (sub *TopicSubscriber) doLookup(brokerAddress string) error { }) if err != nil { - return fmt.Errorf("lookup topic %s/%s: %v", sub.namespace, sub.topic, err) + return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } return nil } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index 622b88828..3667a2bdf 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -8,33 +8,34 @@ import ( "sync" ) -type EachMessageFunc func(key, value []byte) (shouldContinue bool) -type FinalFunc func() +// Subscribe subscribes to a topic's specified partitions. +// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. -func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn FinalFunc) error { +func (sub *TopicSubscriber) Subscribe() error { var wg sync.WaitGroup for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { brokerAddress := brokerPartitionAssignment.LeaderBroker - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.grpcDialOption) + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) if err != nil { return fmt.Errorf("dial broker %s: %v", brokerAddress, err) } brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ Consumer: &mq_pb.SubscribeRequest_Consumer{ - ConsumerGroup: sub.config.ConsumerGroup, - ConsumerId: sub.config.ConsumerId, + ConsumerGroup: sub.SubscriberConfig.GroupId, + ConsumerId: sub.SubscriberConfig.GroupInstanceId, }, Cursor: &mq_pb.SubscribeRequest_Cursor{ Topic: &mq_pb.Topic{ - Namespace: sub.namespace, - Name: sub.topic, + Namespace: sub.ContentConfig.Namespace, + Name: sub.ContentConfig.Topic, }, Partition: &mq_pb.Partition{ RingSize: brokerPartitionAssignment.Partition.RingSize, RangeStart: brokerPartitionAssignment.Partition.RangeStart, RangeStop: brokerPartitionAssignment.Partition.RangeStop, }, + Filter: sub.ContentConfig.Filter, }, }) if err != nil { @@ -43,8 +44,8 @@ func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn Fin wg.Add(1) go func() { defer wg.Done() - if finalFn != nil { - defer finalFn() + if sub.OnCompletionFunc != nil { + defer sub.OnCompletionFunc() } for { resp, err := subscribeClient.Recv() @@ -57,7 +58,7 @@ func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn Fin } switch m := resp.Message.(type) { case *mq_pb.SubscribeResponse_Data: - if !eachMessageFn(m.Data.Key, m.Data.Value) { + if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { return } case *mq_pb.SubscribeResponse_Ctrl: diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index a193730b0..404d05222 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -3,28 +3,37 @@ package sub_client import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type SubscriberConfiguration struct { - ConsumerGroup string - ConsumerId string + ClientId string + GroupId string + GroupInstanceId string + BootstrapServers []string + GrpcDialOption grpc.DialOption } +type ContentConfiguration struct { + Namespace string + Topic string + Filter string +} + +type OnEachMessageFunc func(key, value []byte) (shouldContinue bool) +type OnCompletionFunc func() + type TopicSubscriber struct { - config *SubscriberConfiguration - namespace string - topic string + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment - grpcDialOption grpc.DialOption + OnEachMessageFunc OnEachMessageFunc + OnCompletionFunc OnCompletionFunc } -func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber { +func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { return &TopicSubscriber{ - config: config, - namespace: namespace, - topic: topic, - grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + SubscriberConfig: subscriber, + ContentConfig: content, } } @@ -34,3 +43,11 @@ func (sub *TopicSubscriber) Connect(bootstrapBroker string) error { } return nil } + +func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) { + sub.OnEachMessageFunc = onEachMessageFn +} + +func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) { + sub.OnCompletionFunc = onCompeletionFn +} |
