diff options
| author | chrislu <chris.lu@gmail.com> | 2023-09-24 14:22:11 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-09-24 22:00:43 -0700 |
| commit | 0361c321b40e5b0ff13edf01c8bb1b095f612caf (patch) | |
| tree | 9117922ea1663c9a81ada68dad10f35528ff6171 /weed/shell | |
| parent | 0f8168c0c928bba3d2f48b0680d3bdce9c617559 (diff) | |
| download | seaweedfs-0361c321b40e5b0ff13edf01c8bb1b095f612caf.tar.xz seaweedfs-0361c321b40e5b0ff13edf01c8bb1b095f612caf.zip | |
add CreateTopic API
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_mq_topic_create.go | 65 | ||||
| -rw-r--r-- | weed/shell/command_mq_topic_list.go | 37 |
2 files changed, 100 insertions, 2 deletions
diff --git a/weed/shell/command_mq_topic_create.go b/weed/shell/command_mq_topic_create.go new file mode 100644 index 000000000..b09f94451 --- /dev/null +++ b/weed/shell/command_mq_topic_create.go @@ -0,0 +1,65 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandMqTopicCreate{}) +} + +type commandMqTopicCreate struct { +} + +func (c *commandMqTopicCreate) Name() string { + return "mq.topic.create" +} + +func (c *commandMqTopicCreate) Help() string { + return `create a topic with a given name + + Example: + mq.topic.create -namespace <namespace> -topic <topic_name> -partition_count <partition_count> +` +} + +func (c *commandMqTopicCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { + + // parse parameters + mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + namespace := mqCommand.String("namespace", "", "namespace name") + topicName := mqCommand.String("topic", "", "topic name") + partitionCount := mqCommand.Int("partitionCount", 6, "partition count") + if err := mqCommand.Parse(args); err != nil { + return err + } + + // find the broker balancer + brokerBalancer, err := findBrokerBalancer(commandEnv) + if err != nil { + return err + } + fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer) + + // create topic + return pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + resp, err := client.CreateTopic(context.Background(), &mq_pb.CreateTopicRequest{ + Topic: &mq_pb.Topic{ + Namespace: *namespace, + Name: *topicName, + }, + PartitionCount: int32(*partitionCount), + }) + if err != nil { + return err + } + fmt.Fprintf(writer, "response: %+v\n", resp) + return nil + }) + +} diff --git a/weed/shell/command_mq_topic_list.go b/weed/shell/command_mq_topic_list.go index a069b0614..5d18d8755 100644 --- a/weed/shell/command_mq_topic_list.go +++ b/weed/shell/command_mq_topic_list.go @@ -1,7 +1,10 @@ package shell import ( + "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/balancer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "io" ) @@ -20,9 +23,39 @@ func (c *commandMqTopicList) Help() string { return `print out all topics` } -func (c *commandMqTopicList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandMqTopicList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { - fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory) + brokerBalancer, err := findBrokerBalancer(commandEnv) + if err != nil { + return err + } + + //pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client pb.SeaweedMessagingClient) error { + // resp, err := client.ListTopics(context.Background(), &pb.ListTopicsRequest{}) + // if err != nil { + // return err + // } + // for _, topic := range resp.Topics { + // fmt.Fprintf(writer, "%s\n", topic) + // } + // return nil + //}) + + fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer) return nil } + +func findBrokerBalancer(commandEnv *CommandEnv) (brokerBalancer string, err error) { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ + Name: balancer.LockBrokerBalancer, + }) + if err != nil { + return err + } + brokerBalancer = resp.Owner + return nil + }) + return +} |
