diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_balance.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_balance.go | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go new file mode 100644 index 000000000..c09161ff9 --- /dev/null +++ b/weed/mq/broker/broker_grpc_balance.go @@ -0,0 +1,31 @@ +package broker + +import ( + "context" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) { + if b.currentBalancer == "" { + return nil, status.Errorf(codes.Unavailable, "no balancer") + } + if !b.lockAsBalancer.IsLocked() { + proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error { + resp, err = client.BalanceTopics(ctx, request) + return nil + }) + if proxyErr != nil { + return nil, proxyErr + } + return resp, err + } + + ret := &mq_pb.BalanceTopicsResponse{} + + actions := b.Balancer.BalancePublishers() + err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption) + + return ret, err +} |
