aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/cmd
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-04 21:43:50 -0700
committerchrislu <chris.lu@gmail.com>2023-09-04 21:43:50 -0700
commitba67e6ca2998e82eb23abf5c431bdf9a92e966ea (patch)
treebcf8c521f4c428f4b2690843a11b9708ce013cc0 /weed/mq/client/cmd
parent9e4f98569898985ed285d8bb8a39b4ea5f095a98 (diff)
downloadseaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.tar.xz
seaweedfs-ba67e6ca2998e82eb23abf5c431bdf9a92e966ea.zip
api for sub
Diffstat (limited to 'weed/mq/client/cmd')
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go32
1 files changed, 23 insertions, 9 deletions
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index 529d09a4d..1ec24f406 100644
--- a/weed/mq/client/cmd/weed_sub/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
@@ -3,27 +3,41 @@ package main
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
)
func main() {
- subscriber := sub_client.NewTopicSubscriber(
- &sub_client.SubscriberConfiguration{
- ConsumerGroup: "test",
- ConsumerId: "test",
- },
- "test", "test")
+ subscriberConfig := &sub_client.SubscriberConfiguration{
+ ClientId: "testSubscriber",
+ GroupId: "test",
+ GroupInstanceId: "test",
+ GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ }
+
+ contentConfig := &sub_client.ContentConfiguration{
+ Namespace: "test",
+ Topic: "test",
+ Filter: "",
+ }
+
+ subscriber := sub_client.NewTopicSubscriber(subscriberConfig, contentConfig)
if err := subscriber.Connect("localhost:17777"); err != nil {
fmt.Println(err)
return
}
- if err := subscriber.Subscribe(func(key, value []byte) bool {
+ subscriber.SetEachMessageFunc(func(key, value []byte) bool {
println(string(key), "=>", string(value))
return true
- }, func() {
+ })
+
+ subscriber.SetCompletionFunc(func() {
println("done subscribing")
- }); err != nil {
+ })
+
+ if err := subscriber.Subscribe(); err != nil {
fmt.Println(err)
}