diff options
Diffstat (limited to 'weed/mq/pub_balancer/balance_action.go')
| -rw-r--r-- | weed/mq/pub_balancer/balance_action.go | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/weed/mq/pub_balancer/balance_action.go b/weed/mq/pub_balancer/balance_action.go new file mode 100644 index 000000000..c29ec3469 --- /dev/null +++ b/weed/mq/pub_balancer/balance_action.go @@ -0,0 +1,58 @@ +package pub_balancer + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" +) + +// Balancer <= PublisherToPubBalancer() <= Broker <=> Publish() +// ExecuteBalanceActionMove from Balancer => AssignTopicPartitions() => Broker => Publish() + +func (balancer *Balancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error { + if _, found := balancer.Brokers.Get(move.SourceBroker); !found { + return fmt.Errorf("source broker %s not found", move.SourceBroker) + } + if _, found := balancer.Brokers.Get(move.TargetBroker); !found { + return fmt.Errorf("target broker %s not found", move.TargetBroker) + } + + err := pb.WithBrokerGrpcClient(false, move.TargetBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{ + Topic: move.TopicPartition.Topic.ToPbTopic(), + BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + Partition: move.TopicPartition.ToPbPartition(), + }, + }, + IsLeader: true, + IsDraining: false, + }) + return err + }) + if err != nil { + return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.TargetBroker, err) + } + + err = pb.WithBrokerGrpcClient(false, move.SourceBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{ + Topic: move.TopicPartition.Topic.ToPbTopic(), + BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + Partition: move.TopicPartition.ToPbPartition(), + }, + }, + IsLeader: true, + IsDraining: true, + }) + return err + }) + if err != nil { + return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.SourceBroker, err) + } + + return nil + +} |
