aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go6
-rw-r--r--weed/mq/pub_balancer/allocate.go101
-rw-r--r--weed/mq/pub_balancer/allocate_test.go132
3 files changed, 214 insertions, 25 deletions
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index dbd0d97c7..4bcb62931 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -88,9 +88,9 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
// also fix assignee broker if invalid
- addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)
- if len(addedAssignments) > 0 || len(updatedAssignments) > 0 {
- glog.V(0).Infof("topic %v partition assignments added: %v updated: %v", t, addedAssignments, updatedAssignments)
+ hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
+ if hasChanges {
+ glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
return err
}
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index 520f6bcf2..bd1ccb6dd 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -38,7 +38,7 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p
return
}
-// for now: randomly pick brokers
+// randomly pick n brokers, which may contain duplicates
// TODO pick brokers based on the broker stats
func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string {
candidates := make([]string, 0, brokers.Count())
@@ -47,39 +47,96 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32)
}
pickedBrokers := make([]string, 0, count)
for i := int32(0); i < count; i++ {
- p := rand.Int() % len(candidates)
- if p < 0 {
- p = -p
- }
+ p := rand.Intn(len(candidates))
pickedBrokers = append(pickedBrokers, candidates[p])
}
return pickedBrokers
}
-func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (addedAssignments, updatedAssignments []*mq_pb.BrokerPartitionAssignment) {
- for _, assignment := range assignments {
- if assignment.LeaderBroker == "" {
- addedAssignments = append(addedAssignments, assignment)
+// reservoir sampling select N brokers from the active brokers, with exclusion of the excluded brokers
+func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBrokers []string) []string {
+ // convert the excluded brokers to a map
+ excludedBrokerMap := make(map[string]bool)
+ for _, broker := range excludedBrokers {
+ excludedBrokerMap[broker] = true
+ }
+ if excludedLeadBroker != "" {
+ excludedBrokerMap[excludedLeadBroker] = true
+ }
+
+ pickedBrokers := make([]string, 0, count)
+ for i, broker := range brokers {
+ if _, found := excludedBrokerMap[broker]; found {
continue
}
- if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
- updatedAssignments = append(updatedAssignments, assignment)
- continue
+ if len(pickedBrokers) < count {
+ pickedBrokers = append(pickedBrokers, broker)
+ } else {
+ j := rand.Intn(i + 1)
+ if j < count {
+ pickedBrokers[j] = broker
+ }
}
}
- // pick the brokers with the least number of partitions
- if len(addedAssignments) > 0 {
- pickedBrokers := pickBrokers(activeBrokers, int32(len(addedAssignments)))
- for i, assignment := range addedAssignments {
- assignment.LeaderBroker = pickedBrokers[i]
- }
+ // shuffle the picked brokers
+ count = len(pickedBrokers)
+ for i := 0; i < count; i++ {
+ j := rand.Intn(count)
+ pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i]
+ }
+
+ return pickedBrokers
+}
+
+func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
+ candidates := make([]string, 0, activeBrokers.Count())
+ for brokerStatsItem := range activeBrokers.IterBuffered() {
+ candidates = append(candidates, brokerStatsItem.Key)
}
- if len(updatedAssignments) == 0 {
- pickedBrokers := pickBrokers(activeBrokers, int32(len(updatedAssignments)))
- for i, assignment := range updatedAssignments {
- assignment.LeaderBroker = pickedBrokers[i]
+
+ for _, assignment := range assignments {
+ // count how many brokers are needed
+ count := 0
+ if assignment.LeaderBroker == "" {
+ count++
+ } else if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
+ assignment.LeaderBroker = ""
+ count++
+ }
+ for i:=0; i<followerCount; i++ {
+ if i >= len(assignment.FollowerBrokers) {
+ count++
+ continue
+ }
+ if assignment.FollowerBrokers[i] == "" {
+ count++
+ } else if _, found := activeBrokers.Get(assignment.FollowerBrokers[i]); !found {
+ assignment.FollowerBrokers[i] = ""
+ count++
+ }
}
+
+ if count > 0 {
+ pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers)
+ i := 0
+ if assignment.LeaderBroker == "" {
+ assignment.LeaderBroker = pickedBrokers[i]
+ i++
+ }
+ j := 0
+ for ; j<len(assignment.FollowerBrokers); j++ {
+ if assignment.FollowerBrokers[j] == "" {
+ assignment.FollowerBrokers[j] = pickedBrokers[i]
+ i++
+ }
+ }
+ if i < len(pickedBrokers) {
+ assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...)
+ }
+ }
+
+ hasChanges = hasChanges || count > 0
}
return
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index 3a1598fa0..2f298d3e5 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -1,6 +1,7 @@
package pub_balancer
import (
+ "fmt"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/stretchr/testify/assert"
@@ -65,3 +66,134 @@ func testThem(t *testing.T, tests []struct {
})
}
}
+
+func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
+ type args struct {
+ activeBrokers cmap.ConcurrentMap[string, *BrokerStats]
+ followerCount int
+ assignments []*mq_pb.BrokerPartitionAssignment
+ }
+ activeBrokers := cmap.New[*BrokerStats]()
+ activeBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
+ activeBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
+ activeBrokers.SetIfAbsent("localhost:3", &BrokerStats{})
+ activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{})
+ activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{})
+ activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{})
+ tests := []struct {
+ name string
+ args args
+ hasChanges bool
+ }{
+ {
+ name: "test empty leader",
+ args: args{
+ activeBrokers: activeBrokers,
+ followerCount: 1,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "localhost:2",
+ },
+ },
+ },
+ },
+ hasChanges: true,
+ },
+ {
+ name: "test empty follower",
+ args: args{
+ activeBrokers: activeBrokers,
+ followerCount: 1,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "",
+ },
+ },
+ },
+ },
+ hasChanges: true,
+ },
+ {
+ name: "test dead follower",
+ args: args{
+ activeBrokers: activeBrokers,
+ followerCount: 1,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "localhost:200",
+ },
+ },
+ },
+ },
+ hasChanges: true,
+ },
+ {
+ name: "test dead leader and follower",
+ args: args{
+ activeBrokers: activeBrokers,
+ followerCount: 1,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:100",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "localhost:200",
+ },
+ },
+ },
+ },
+ hasChanges: true,
+ },
+ {
+ name: "test missing two followers",
+ args: args{
+ activeBrokers: activeBrokers,
+ followerCount: 3,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "localhost:2",
+ },
+ },
+ },
+ },
+ hasChanges: true,
+ },
+ {
+ name: "test missing some followers",
+ args: args{
+ activeBrokers: activeBrokers,
+ followerCount: 10,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "localhost:2",
+ },
+ },
+ },
+ },
+ hasChanges: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ fmt.Printf("%v before %v\n", tt.name, tt.args.assignments)
+ hasChanges := EnsureAssignmentsToActiveBrokers(tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
+ assert.Equalf(t, tt.hasChanges, hasChanges, "EnsureAssignmentsToActiveBrokers(%v, %v, %v)", tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
+ fmt.Printf("%v after %v\n", tt.name, tt.args.assignments)
+ })
+ }
+}