aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/msgclient/config.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/msgclient/config.go')
-rw-r--r--weed/mq/msgclient/config.go63
1 files changed, 63 insertions, 0 deletions
diff --git a/weed/mq/msgclient/config.go b/weed/mq/msgclient/config.go
new file mode 100644
index 000000000..263ee856e
--- /dev/null
+++ b/weed/mq/msgclient/config.go
@@ -0,0 +1,63 @@
+package msgclient
+
+import (
+ "context"
+ "log"
+
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
+)
+
+func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
+
+ return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.ConfigureTopic(context.Background(),
+ &mq_pb.ConfigureTopicRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Configuration: &mq_pb.TopicConfiguration{
+ PartitionCount: 0,
+ Collection: "",
+ Replication: "",
+ IsTransient: false,
+ Partitoning: 0,
+ },
+ })
+ return err
+ })
+
+}
+
+func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
+
+ return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.DeleteTopic(context.Background(),
+ &mq_pb.DeleteTopicRequest{
+ Namespace: namespace,
+ Topic: topic,
+ })
+ return err
+ })
+}
+
+func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error {
+
+ var lastErr error
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection))
+ if err == nil {
+ return nil
+ }
+ lastErr = err
+ }
+
+ return lastErr
+}