diff options
| author | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:50 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:50 -0700 |
| commit | ba67e6ca2998e82eb23abf5c431bdf9a92e966ea (patch) | |
| tree | bcf8c521f4c428f4b2690843a11b9708ce013cc0 /weed/mq/client/cmd | |
| parent | 9e4f98569898985ed285d8bb8a39b4ea5f095a98 (diff) | |
| download | seaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.tar.xz seaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.zip | |
api for sub
Diffstat (limited to 'weed/mq/client/cmd')
| -rw-r--r-- | weed/mq/client/cmd/weed_sub/subscriber.go | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index 529d09a4d..1ec24f406 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -3,27 +3,41 @@ package main import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) func main() { - subscriber := sub_client.NewTopicSubscriber( - &sub_client.SubscriberConfiguration{ - ConsumerGroup: "test", - ConsumerId: "test", - }, - "test", "test") + subscriberConfig := &sub_client.SubscriberConfiguration{ + ClientId: "testSubscriber", + GroupId: "test", + GroupInstanceId: "test", + GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + contentConfig := &sub_client.ContentConfiguration{ + Namespace: "test", + Topic: "test", + Filter: "", + } + + subscriber := sub_client.NewTopicSubscriber(subscriberConfig, contentConfig) if err := subscriber.Connect("localhost:17777"); err != nil { fmt.Println(err) return } - if err := subscriber.Subscribe(func(key, value []byte) bool { + subscriber.SetEachMessageFunc(func(key, value []byte) bool { println(string(key), "=>", string(value)) return true - }, func() { + }) + + subscriber.SetCompletionFunc(func() { println("done subscribing") - }); err != nil { + }) + + if err := subscriber.Subscribe(); err != nil { fmt.Println(err) } |
