aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-07 10:45:38 -0800
committerchrislu <chris.lu@gmail.com>2024-03-07 10:45:38 -0800
commit49869eec83dba47018a433181c392f69fea23dfe (patch)
tree2d960d93656956b20e191be4502d0af01d7a915b
parent33ab6cfcf783c8b8c78da7220a6d9474615199c9 (diff)
downloadseaweedfs-49869eec83dba47018a433181c392f69fea23dfe.tar.xz
seaweedfs-49869eec83dba47018a433181c392f69fea23dfe.zip
edge cases for broker assignment
-rw-r--r--weed/mq/pub_balancer/allocate.go18
-rw-r--r--weed/mq/pub_balancer/allocate_test.go14
2 files changed, 23 insertions, 9 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index 1dd1ef9d8..39d91bef3 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -27,13 +27,8 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p
assignments = append(assignments, assignment)
}
- // pick the brokers
- pickedBrokers := pickBrokers(brokers, partitionCount)
+ EnsureAssignmentsToActiveBrokers(brokers, 1, assignments)
- // assign the partitions to brokers
- for i, assignment := range assignments {
- assignment.LeaderBroker = pickedBrokers[i]
- }
glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
return
}
@@ -91,6 +86,8 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string,
// EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
+ glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments)
+
candidates := make([]string, 0, activeBrokers.Count())
for brokerStatsItem := range activeBrokers.IterBuffered() {
candidates = append(candidates, brokerStatsItem.Key)
@@ -122,9 +119,11 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers)
i := 0
if assignment.LeaderBroker == "" {
- assignment.LeaderBroker = pickedBrokers[i]
- i++
- hasChanges = true
+ if i < len(pickedBrokers) {
+ assignment.LeaderBroker = pickedBrokers[i]
+ i++
+ hasChanges = true
+ }
}
hasEmptyFollowers := false
@@ -158,5 +157,6 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
}
+ glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges)
return
}
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index b585219dc..5f6342e99 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -210,6 +210,20 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
hasChanges: false,
},
{
+ name: "test low active brokers with one follower",
+ args: args{
+ activeBrokers: lowActiveBrokers,
+ followerCount: 1,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
+ },
+ },
+ },
+ hasChanges: true,
+ },
+ {
name: "test single active broker",
args: args{
activeBrokers: singleActiveBroker,