aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-01 05:59:40 -0800
committerchrislu <chris.lu@gmail.com>2024-03-01 05:59:40 -0800
commitfe03b1b5228d421f2b9e6903a728aa76866166f1 (patch)
tree8097f8086c9ed7c544c96f0793a8d697c01e5ea4
parent50c5dd7313817a1fbef99b86c9520e449d9b5bc5 (diff)
downloadseaweedfs-fe03b1b5228d421f2b9e6903a728aa76866166f1.tar.xz
seaweedfs-fe03b1b5228d421f2b9e6903a728aa76866166f1.zip
handle single broker case
-rw-r--r--weed/mq/pub_balancer/allocate.go20
-rw-r--r--weed/mq/pub_balancer/allocate_test.go19
2 files changed, 37 insertions, 2 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index 12d83d01a..1dd1ef9d8 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -126,14 +126,30 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
i++
hasChanges = true
}
+
+ hasEmptyFollowers := false
j := 0
for ; j<len(assignment.FollowerBrokers); j++ {
if assignment.FollowerBrokers[j] == "" {
- assignment.FollowerBrokers[j] = pickedBrokers[i]
- i++
hasChanges = true
+ if i < len(pickedBrokers) {
+ assignment.FollowerBrokers[j] = pickedBrokers[i]
+ i++
+ } else {
+ hasEmptyFollowers = true
+ }
+ }
+ }
+ if hasEmptyFollowers {
+ var followerBrokers []string
+ for _, follower := range assignment.FollowerBrokers {
+ if follower != "" {
+ followerBrokers = append(followerBrokers, follower)
+ }
}
+ assignment.FollowerBrokers = followerBrokers
}
+
if i < len(pickedBrokers) {
assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...)
hasChanges = true
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index 8b0bc3603..b585219dc 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -83,6 +83,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
lowActiveBrokers := cmap.New[*BrokerStats]()
lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
+ singleActiveBroker := cmap.New[*BrokerStats]()
+ singleActiveBroker.SetIfAbsent("localhost:1", &BrokerStats{})
tests := []struct {
name string
args args
@@ -207,6 +209,23 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
},
hasChanges: false,
},
+ {
+ name: "test single active broker",
+ args: args{
+ activeBrokers: singleActiveBroker,
+ followerCount: 3,
+ assignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
+ FollowerBrokers: []string{
+ "localhost:2",
+ },
+ },
+ },
+ },
+ hasChanges: true,
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {