diff options
| author | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:50 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:50 -0700 |
| commit | ba67e6ca2998e82eb23abf5c431bdf9a92e966ea (patch) | |
| tree | bcf8c521f4c428f4b2690843a11b9708ce013cc0 /weed/mq | |
| parent | 9e4f98569898985ed285d8bb8a39b4ea5f095a98 (diff) | |
| download | seaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.tar.xz seaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.zip | |
api for sub
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub/subscriber.go | 32 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/lookup.go | 8 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 23 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 41 |
5 files changed, 69 insertions, 37 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 9a7e53ca1..ad65df2d1 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -24,7 +24,7 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream return nil } - clientName := fmt.Sprintf("%s/%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId) + clientName := fmt.Sprintf("%s/%s-%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId, req.Consumer.ClientId) localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error { value := logEntry.GetData() diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index 529d09a4d..1ec24f406 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -3,27 +3,41 @@ package main import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) func main() { - subscriber := sub_client.NewTopicSubscriber( - &sub_client.SubscriberConfiguration{ - ConsumerGroup: "test", - ConsumerId: "test", - }, - "test", "test") + subscriberConfig := &sub_client.SubscriberConfiguration{ + ClientId: "testSubscriber", + GroupId: "test", + GroupInstanceId: "test", + GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + contentConfig := &sub_client.ContentConfiguration{ + Namespace: "test", + Topic: "test", + Filter: "", + } + + subscriber := sub_client.NewTopicSubscriber(subscriberConfig, contentConfig) if err := subscriber.Connect("localhost:17777"); err != nil { fmt.Println(err) return } - if err := subscriber.Subscribe(func(key, value []byte) bool { + subscriber.SetEachMessageFunc(func(key, value []byte) bool { println(string(key), "=>", string(value)) return true - }, func() { + }) + + subscriber.SetCompletionFunc(func() { println("done subscribing") - }); err != nil { + }) + + if err := subscriber.Subscribe(); err != nil { fmt.Println(err) } diff --git a/weed/mq/client/sub_client/lookup.go b/weed/mq/client/sub_client/lookup.go index e836c4864..b6d2a8c53 100644 --- a/weed/mq/client/sub_client/lookup.go +++ b/weed/mq/client/sub_client/lookup.go @@ -10,13 +10,13 @@ import ( func (sub *TopicSubscriber) doLookup(brokerAddress string) error { err := pb.WithBrokerGrpcClient(true, brokerAddress, - sub.grpcDialOption, + sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { lookupResp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{ Topic: &mq_pb.Topic{ - Namespace: sub.namespace, - Name: sub.topic, + Namespace: sub.ContentConfig.Namespace, + Name: sub.ContentConfig.Topic, }, IsForPublish: false, }) @@ -28,7 +28,7 @@ func (sub *TopicSubscriber) doLookup(brokerAddress string) error { }) if err != nil { - return fmt.Errorf("lookup topic %s/%s: %v", sub.namespace, sub.topic, err) + return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } return nil } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index 622b88828..3667a2bdf 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -8,33 +8,34 @@ import ( "sync" ) -type EachMessageFunc func(key, value []byte) (shouldContinue bool) -type FinalFunc func() +// Subscribe subscribes to a topic's specified partitions. +// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. -func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn FinalFunc) error { +func (sub *TopicSubscriber) Subscribe() error { var wg sync.WaitGroup for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { brokerAddress := brokerPartitionAssignment.LeaderBroker - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.grpcDialOption) + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) if err != nil { return fmt.Errorf("dial broker %s: %v", brokerAddress, err) } brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ Consumer: &mq_pb.SubscribeRequest_Consumer{ - ConsumerGroup: sub.config.ConsumerGroup, - ConsumerId: sub.config.ConsumerId, + ConsumerGroup: sub.SubscriberConfig.GroupId, + ConsumerId: sub.SubscriberConfig.GroupInstanceId, }, Cursor: &mq_pb.SubscribeRequest_Cursor{ Topic: &mq_pb.Topic{ - Namespace: sub.namespace, - Name: sub.topic, + Namespace: sub.ContentConfig.Namespace, + Name: sub.ContentConfig.Topic, }, Partition: &mq_pb.Partition{ RingSize: brokerPartitionAssignment.Partition.RingSize, RangeStart: brokerPartitionAssignment.Partition.RangeStart, RangeStop: brokerPartitionAssignment.Partition.RangeStop, }, + Filter: sub.ContentConfig.Filter, }, }) if err != nil { @@ -43,8 +44,8 @@ func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn Fin wg.Add(1) go func() { defer wg.Done() - if finalFn != nil { - defer finalFn() + if sub.OnCompletionFunc != nil { + defer sub.OnCompletionFunc() } for { resp, err := subscribeClient.Recv() @@ -57,7 +58,7 @@ func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn Fin } switch m := resp.Message.(type) { case *mq_pb.SubscribeResponse_Data: - if !eachMessageFn(m.Data.Key, m.Data.Value) { + if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { return } case *mq_pb.SubscribeResponse_Ctrl: diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index a193730b0..404d05222 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -3,28 +3,37 @@ package sub_client import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type SubscriberConfiguration struct { - ConsumerGroup string - ConsumerId string + ClientId string + GroupId string + GroupInstanceId string + BootstrapServers []string + GrpcDialOption grpc.DialOption } +type ContentConfiguration struct { + Namespace string + Topic string + Filter string +} + +type OnEachMessageFunc func(key, value []byte) (shouldContinue bool) +type OnCompletionFunc func() + type TopicSubscriber struct { - config *SubscriberConfiguration - namespace string - topic string + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment - grpcDialOption grpc.DialOption + OnEachMessageFunc OnEachMessageFunc + OnCompletionFunc OnCompletionFunc } -func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber { +func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { return &TopicSubscriber{ - config: config, - namespace: namespace, - topic: topic, - grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + SubscriberConfig: subscriber, + ContentConfig: content, } } @@ -34,3 +43,11 @@ func (sub *TopicSubscriber) Connect(bootstrapBroker string) error { } return nil } + +func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) { + sub.OnEachMessageFunc = onEachMessageFn +} + +func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) { + sub.OnCompletionFunc = onCompeletionFn +} |
