diff options
Diffstat (limited to 'weed/mq/pub_balancer/allocate.go')
| -rw-r--r-- | weed/mq/pub_balancer/allocate.go | 62 |
1 files changed, 13 insertions, 49 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index d7632f8d6..ae32188ec 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -48,20 +48,11 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) return pickedBrokers } -// reservoir sampling select N brokers from the active brokers, with exclusion of the excluded brokers -func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBrokers []string) []string { - // convert the excluded brokers to a map - excludedBrokerMap := make(map[string]bool) - for _, broker := range excludedBrokers { - excludedBrokerMap[broker] = true - } - if excludedLeadBroker != "" { - excludedBrokerMap[excludedLeadBroker] = true - } - +// reservoir sampling select N brokers from the active brokers, with exclusion of the excluded broker +func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBroker string) []string { pickedBrokers := make([]string, 0, count) for i, broker := range brokers { - if _, found := excludedBrokerMap[broker]; found { + if broker == excludedBroker { continue } if len(pickedBrokers) < count { @@ -102,21 +93,15 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * assignment.LeaderBroker = "" count++ } - for i := 0; i < followerCount; i++ { - if i >= len(assignment.FollowerBrokers) { - count++ - continue - } - if assignment.FollowerBrokers[i] == "" { - count++ - } else if _, found := activeBrokers.Get(assignment.FollowerBrokers[i]); !found { - assignment.FollowerBrokers[i] = "" - count++ - } + if assignment.FollowerBroker == "" { + count++ + } else if _, found := activeBrokers.Get(assignment.FollowerBroker); !found { + assignment.FollowerBroker = "" + count++ } if count > 0 { - pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers) + pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBroker) i := 0 if assignment.LeaderBroker == "" { if i < len(pickedBrokers) { @@ -125,34 +110,13 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * hasChanges = true } } - - hasEmptyFollowers := false - j := 0 - for ; j < len(assignment.FollowerBrokers); j++ { - if assignment.FollowerBrokers[j] == "" { + if assignment.FollowerBroker == "" { + if i < len(pickedBrokers) { + assignment.FollowerBroker = pickedBrokers[i] + i++ hasChanges = true - if i < len(pickedBrokers) { - assignment.FollowerBrokers[j] = pickedBrokers[i] - i++ - } else { - hasEmptyFollowers = true - } } } - if hasEmptyFollowers { - var followerBrokers []string - for _, follower := range assignment.FollowerBrokers { - if follower != "" { - followerBrokers = append(followerBrokers, follower) - } - } - assignment.FollowerBrokers = followerBrokers - } - - if i < len(pickedBrokers) { - assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...) - hasChanges = true - } } } |
