aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-22 11:57:17 -0800
committerchrislu <chris.lu@gmail.com>2024-01-22 11:57:17 -0800
commitd268fbe18a4ebe3485a117d53407638584a4b730 (patch)
tree963cbf0f48ec0c09e6441e72b6b0c458f099c24b
parent3ff6b31d94d4a3fc006acd33c8bb9fdb8d96f023 (diff)
downloadseaweedfs-d268fbe18a4ebe3485a117d53407638584a4b730.tar.xz
seaweedfs-d268fbe18a4ebe3485a117d53407638584a4b730.zip
when configure, cancel existing assignments
-rw-r--r--weed/mq/broker/broker_grpc_configure.go5
1 files changed, 5 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index f5bcceb44..9b6cf9d2a 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -42,6 +42,11 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
if err == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
} else {
+ if resp!=nil && len(resp.BrokerPartitionAssignments) > 0 {
+ if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
+ glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
+ }
+ }
resp = &mq_pb.ConfigureTopicResponse{}
if b.Balancer.Brokers.IsEmpty() {
return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())