aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_balance.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_balance.go')
-rw-r--r--weed/mq/broker/broker_grpc_balance.go31
1 files changed, 31 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go
new file mode 100644
index 000000000..c09161ff9
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_balance.go
@@ -0,0 +1,31 @@
+package broker
+
+import (
+ "context"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
+ if b.currentBalancer == "" {
+ return nil, status.Errorf(codes.Unavailable, "no balancer")
+ }
+ if !b.lockAsBalancer.IsLocked() {
+ proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ resp, err = client.BalanceTopics(ctx, request)
+ return nil
+ })
+ if proxyErr != nil {
+ return nil, proxyErr
+ }
+ return resp, err
+ }
+
+ ret := &mq_pb.BalanceTopicsResponse{}
+
+ actions := b.Balancer.BalancePublishers()
+ err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption)
+
+ return ret, err
+}