aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/pub_balancer/allocate.go5
-rw-r--r--weed/mq/pub_balancer/allocate_test.go20
2 files changed, 24 insertions, 1 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index bd1ccb6dd..12d83d01a 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -89,6 +89,7 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string,
return pickedBrokers
}
+// EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
candidates := make([]string, 0, activeBrokers.Count())
for brokerStatsItem := range activeBrokers.IterBuffered() {
@@ -123,20 +124,22 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
if assignment.LeaderBroker == "" {
assignment.LeaderBroker = pickedBrokers[i]
i++
+ hasChanges = true
}
j := 0
for ; j<len(assignment.FollowerBrokers); j++ {
if assignment.FollowerBrokers[j] == "" {
assignment.FollowerBrokers[j] = pickedBrokers[i]
i++
+ hasChanges = true
}
}
if i < len(pickedBrokers) {
assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...)
+ hasChanges = true
}
}
- hasChanges = hasChanges || count > 0
}
return
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index 2f298d3e5..8b0bc3603 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -80,6 +80,9 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{})
activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{})
activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{})
+ lowActiveBrokers := cmap.New[*BrokerStats]()
+ lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
+ lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
tests := []struct {
name string
args args
@@ -187,6 +190,23 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
},
hasChanges: true,
},
+ {
+ name: "test low active brokers",
+ args: args{
+ activeBrokers: lowActiveBrokers,
+ followerCount: 3,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "localhost:2",
+ },
+ },
+ },
+ },
+ hasChanges: false,
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {