aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-22 11:04:24 -0800
committerchrislu <chris.lu@gmail.com>2024-01-22 11:04:24 -0800
commit428fea45f33ac9807ad5783f394841d59ab49516 (patch)
tree60398d862715482e54a0d49b6b257ecdd1f7363f
parente8b05ecc917464bba42c839ec2ddea7fd3a22e58 (diff)
downloadseaweedfs-428fea45f33ac9807ad5783f394841d59ab49516.tar.xz
seaweedfs-428fea45f33ac9807ad5783f394841d59ab49516.zip
updated and added assignments
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go6
-rw-r--r--weed/mq/pub_balancer/allocate.go17
2 files changed, 14 insertions, 9 deletions
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 397e70fac..710e95b38 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -55,9 +55,9 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
// also fix assignee broker if invalid
- changedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)
- if len(changedAssignments) > 0 {
- glog.V(0).Infof("topic %v partition assignments changed: %v", t, changedAssignments)
+ addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)
+ if len(addedAssignments) > 0 || len(updatedAssignments) > 0 {
+ glog.V(0).Infof("topic %v partition assignments added: %v updated: %v", t, addedAssignments, updatedAssignments)
if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
return err
}
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index 7822f5ed9..249280cb7 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -56,22 +56,27 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32)
return pickedBrokers
}
-func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (changedAssignments []*mq_pb.BrokerPartitionAssignment) {
+func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (addedAssignments, updatedAssignments []*mq_pb.BrokerPartitionAssignment) {
for _, assignment := range assignments {
if assignment.LeaderBroker == "" {
- changedAssignments = append(changedAssignments, assignment)
+ addedAssignments = append(addedAssignments, assignment)
continue
}
if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
- changedAssignments = append(changedAssignments, assignment)
+ updatedAssignments = append(updatedAssignments, assignment)
continue
}
}
// pick the brokers with the least number of partitions
- pickedBrokers := pickBrokers(activeBrokers, int32(len(changedAssignments)))
- for i, assignment := range changedAssignments {
+ pickedBrokers := pickBrokers(activeBrokers, int32(len(addedAssignments)))
+ for i, assignment := range addedAssignments {
assignment.LeaderBroker = pickedBrokers[i]
}
- return changedAssignments
+ pickedBrokers = pickBrokers(activeBrokers, int32(len(updatedAssignments)))
+ for i, assignment := range updatedAssignments {
+ assignment.LeaderBroker = pickedBrokers[i]
+ }
+
+ return
}