aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/config.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-17 17:34:10 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-17 17:34:10 -0700
commitf5684839a2799656c0d8ddf68e191b2c4e29ed30 (patch)
tree1dde00794d6141a0b9c6d99f87180969b9dd89f8 /weed/messaging/msgclient/config.go
parent3a57aef7a910ba66d6026f73d1b3055c2d8a1e4d (diff)
downloadseaweedfs-f5684839a2799656c0d8ddf68e191b2c4e29ed30.tar.xz
seaweedfs-f5684839a2799656c0d8ddf68e191b2c4e29ed30.zip
add DeleteTopic
Diffstat (limited to 'weed/messaging/msgclient/config.go')
-rw-r--r--weed/messaging/msgclient/config.go63
1 files changed, 63 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/config.go b/weed/messaging/msgclient/config.go
new file mode 100644
index 000000000..2b9eba1a8
--- /dev/null
+++ b/weed/messaging/msgclient/config.go
@@ -0,0 +1,63 @@
+package msgclient
+
+import (
+ "context"
+ "log"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
+
+ return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ _, err := client.ConfigureTopic(context.Background(),
+ &messaging_pb.ConfigureTopicRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Configuration: &messaging_pb.TopicConfiguration{
+ PartitionCount: 0,
+ Collection: "",
+ Replication: "",
+ IsTransient: false,
+ Partitoning: 0,
+ },
+ })
+ return err
+ })
+
+}
+
+func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
+
+ return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ _, err := client.DeleteTopic(context.Background(),
+ &messaging_pb.DeleteTopicRequest{
+ Namespace: namespace,
+ Topic: topic,
+ })
+ return err
+ })
+}
+
+func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+
+ var lastErr error
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection))
+ if err == nil {
+ return nil
+ }
+ lastErr = err
+ }
+
+ return lastErr
+}