diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_lookup.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_lookup.go | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index e503b7d17..d6495e410 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -45,3 +45,38 @@ func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, ret := &mq_pb.CheckTopicPartitionsStatusResponse{} return ret, nil } + +func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) { + if broker.currentBalancer == "" { + return nil, status.Errorf(codes.Unavailable, "no balancer") + } + if !broker.lockAsBalancer.IsLocked() { + proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error { + resp, err = client.ListTopics(ctx, request) + return nil + }) + if proxyErr != nil { + return nil, proxyErr + } + return resp, err + } + + ret := &mq_pb.ListTopicsResponse{} + knownTopics := make(map[*mq_pb.Topic]struct{}) + for brokerStatsItem := range broker.Balancer.Brokers.IterBuffered() { + _, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val + for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { + topicPartitionStat := topicPartitionStatsItem.Val + topic := &mq_pb.Topic{ + Namespace: topicPartitionStat.TopicPartition.Namespace, + Name: topicPartitionStat.TopicPartition.Topic, + } + if _, found := knownTopics[topic]; found { + continue + } + ret.Topics = append(ret.Topics, topic) + } + } + + return ret, nil +} |
