aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer/allocate.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer/allocate.go')
-rw-r--r--weed/mq/pub_balancer/allocate.go62
1 files changed, 13 insertions, 49 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index d7632f8d6..ae32188ec 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -48,20 +48,11 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32)
return pickedBrokers
}
-// reservoir sampling select N brokers from the active brokers, with exclusion of the excluded brokers
-func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBrokers []string) []string {
- // convert the excluded brokers to a map
- excludedBrokerMap := make(map[string]bool)
- for _, broker := range excludedBrokers {
- excludedBrokerMap[broker] = true
- }
- if excludedLeadBroker != "" {
- excludedBrokerMap[excludedLeadBroker] = true
- }
-
+// reservoir sampling select N brokers from the active brokers, with exclusion of the excluded broker
+func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBroker string) []string {
pickedBrokers := make([]string, 0, count)
for i, broker := range brokers {
- if _, found := excludedBrokerMap[broker]; found {
+ if broker == excludedBroker {
continue
}
if len(pickedBrokers) < count {
@@ -102,21 +93,15 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
assignment.LeaderBroker = ""
count++
}
- for i := 0; i < followerCount; i++ {
- if i >= len(assignment.FollowerBrokers) {
- count++
- continue
- }
- if assignment.FollowerBrokers[i] == "" {
- count++
- } else if _, found := activeBrokers.Get(assignment.FollowerBrokers[i]); !found {
- assignment.FollowerBrokers[i] = ""
- count++
- }
+ if assignment.FollowerBroker == "" {
+ count++
+ } else if _, found := activeBrokers.Get(assignment.FollowerBroker); !found {
+ assignment.FollowerBroker = ""
+ count++
}
if count > 0 {
- pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers)
+ pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBroker)
i := 0
if assignment.LeaderBroker == "" {
if i < len(pickedBrokers) {
@@ -125,34 +110,13 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
hasChanges = true
}
}
-
- hasEmptyFollowers := false
- j := 0
- for ; j < len(assignment.FollowerBrokers); j++ {
- if assignment.FollowerBrokers[j] == "" {
+ if assignment.FollowerBroker == "" {
+ if i < len(pickedBrokers) {
+ assignment.FollowerBroker = pickedBrokers[i]
+ i++
hasChanges = true
- if i < len(pickedBrokers) {
- assignment.FollowerBrokers[j] = pickedBrokers[i]
- i++
- } else {
- hasEmptyFollowers = true
- }
}
}
- if hasEmptyFollowers {
- var followerBrokers []string
- for _, follower := range assignment.FollowerBrokers {
- if follower != "" {
- followerBrokers = append(followerBrokers, follower)
- }
- }
- assignment.FollowerBrokers = followerBrokers
- }
-
- if i < len(pickedBrokers) {
- assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...)
- hasChanges = true
- }
}
}