aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_mq_topic_list.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-24 14:22:11 -0700
committerchrislu <chris.lu@gmail.com>2023-09-24 22:00:43 -0700
commit0361c321b40e5b0ff13edf01c8bb1b095f612caf (patch)
tree9117922ea1663c9a81ada68dad10f35528ff6171 /weed/shell/command_mq_topic_list.go
parent0f8168c0c928bba3d2f48b0680d3bdce9c617559 (diff)
downloadseaweedfs-0361c321b40e5b0ff13edf01c8bb1b095f612caf.tar.xz
seaweedfs-0361c321b40e5b0ff13edf01c8bb1b095f612caf.zip
add CreateTopic API
Diffstat (limited to 'weed/shell/command_mq_topic_list.go')
-rw-r--r--weed/shell/command_mq_topic_list.go37
1 files changed, 35 insertions, 2 deletions
diff --git a/weed/shell/command_mq_topic_list.go b/weed/shell/command_mq_topic_list.go
index a069b0614..5d18d8755 100644
--- a/weed/shell/command_mq_topic_list.go
+++ b/weed/shell/command_mq_topic_list.go
@@ -1,7 +1,10 @@
package shell
import (
+ "context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io"
)
@@ -20,9 +23,39 @@ func (c *commandMqTopicList) Help() string {
return `print out all topics`
}
-func (c *commandMqTopicList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+func (c *commandMqTopicList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
- fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory)
+ brokerBalancer, err := findBrokerBalancer(commandEnv)
+ if err != nil {
+ return err
+ }
+
+ //pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client pb.SeaweedMessagingClient) error {
+ // resp, err := client.ListTopics(context.Background(), &pb.ListTopicsRequest{})
+ // if err != nil {
+ // return err
+ // }
+ // for _, topic := range resp.Topics {
+ // fmt.Fprintf(writer, "%s\n", topic)
+ // }
+ // return nil
+ //})
+
+ fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer)
return nil
}
+
+func findBrokerBalancer(commandEnv *CommandEnv) (brokerBalancer string, err error) {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
+ Name: balancer.LockBrokerBalancer,
+ })
+ if err != nil {
+ return err
+ }
+ brokerBalancer = resp.Owner
+ return nil
+ })
+ return
+}