aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_lookup.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_lookup.go')
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go35
1 files changed, 35 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index e503b7d17..d6495e410 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -45,3 +45,38 @@ func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context,
ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
return ret, nil
}
+
+func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
+ if broker.currentBalancer == "" {
+ return nil, status.Errorf(codes.Unavailable, "no balancer")
+ }
+ if !broker.lockAsBalancer.IsLocked() {
+ proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ resp, err = client.ListTopics(ctx, request)
+ return nil
+ })
+ if proxyErr != nil {
+ return nil, proxyErr
+ }
+ return resp, err
+ }
+
+ ret := &mq_pb.ListTopicsResponse{}
+ knownTopics := make(map[*mq_pb.Topic]struct{})
+ for brokerStatsItem := range broker.Balancer.Brokers.IterBuffered() {
+ _, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
+ for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
+ topicPartitionStat := topicPartitionStatsItem.Val
+ topic := &mq_pb.Topic{
+ Namespace: topicPartitionStat.TopicPartition.Namespace,
+ Name: topicPartitionStat.TopicPartition.Topic,
+ }
+ if _, found := knownTopics[topic]; found {
+ continue
+ }
+ ret.Topics = append(ret.Topics, topic)
+ }
+ }
+
+ return ret, nil
+}