diff options
| -rw-r--r-- | weed/mq/pub_balancer/allocate.go | 20 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/allocate_test.go | 19 |
2 files changed, 37 insertions, 2 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 12d83d01a..1dd1ef9d8 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -126,14 +126,30 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * i++ hasChanges = true } + + hasEmptyFollowers := false j := 0 for ; j<len(assignment.FollowerBrokers); j++ { if assignment.FollowerBrokers[j] == "" { - assignment.FollowerBrokers[j] = pickedBrokers[i] - i++ hasChanges = true + if i < len(pickedBrokers) { + assignment.FollowerBrokers[j] = pickedBrokers[i] + i++ + } else { + hasEmptyFollowers = true + } + } + } + if hasEmptyFollowers { + var followerBrokers []string + for _, follower := range assignment.FollowerBrokers { + if follower != "" { + followerBrokers = append(followerBrokers, follower) + } } + assignment.FollowerBrokers = followerBrokers } + if i < len(pickedBrokers) { assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...) hasChanges = true diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index 8b0bc3603..b585219dc 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -83,6 +83,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { lowActiveBrokers := cmap.New[*BrokerStats]() lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{}) lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{}) + singleActiveBroker := cmap.New[*BrokerStats]() + singleActiveBroker.SetIfAbsent("localhost:1", &BrokerStats{}) tests := []struct { name string args args @@ -207,6 +209,23 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { }, hasChanges: false, }, + { + name: "test single active broker", + args: args{ + activeBrokers: singleActiveBroker, + followerCount: 3, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "localhost:2", + }, + }, + }, + }, + hasChanges: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { |
