aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic_allocation/allocation.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-18 16:13:20 -0700
committerchrislu <chris.lu@gmail.com>2023-09-18 16:13:20 -0700
commit0bb97709d41b1be4c74f01dcc65aac6d5f88bd16 (patch)
tree9be51ec65888a0741f63912c9bc125d0278d3360 /weed/mq/topic_allocation/allocation.go
parent8cb42c39adbbe7f082009cf91eccb4d2dc527235 (diff)
downloadseaweedfs-0bb97709d41b1be4c74f01dcc65aac6d5f88bd16.tar.xz
seaweedfs-0bb97709d41b1be4c74f01dcc65aac6d5f88bd16.zip
Revert "Merge branch 'master' into sub"
This reverts commit 4d414f54a224142f3f4d934f4af3b5dceb6fec6b, reversing changes made to 482742514656e9b5a652acf7406740fbc55db13d.
Diffstat (limited to 'weed/mq/topic_allocation/allocation.go')
-rw-r--r--weed/mq/topic_allocation/allocation.go81
1 files changed, 81 insertions, 0 deletions
diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go
new file mode 100644
index 000000000..a07ce4884
--- /dev/null
+++ b/weed/mq/topic_allocation/allocation.go
@@ -0,0 +1,81 @@
+package topic_allocation
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "modernc.org/mathutil"
+)
+
+const (
+ DefaultBrokerCount = 4
+)
+
+// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
+func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
+ // create a previous assignment if not exists
+ if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
+ prevAssignment = &mq_pb.TopicPartitionsAssignment{
+ PartitionCount: topic.PartitionCount,
+ }
+ partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
+ for i := 0; i < DefaultBrokerCount; i++ {
+ prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
+ PartitionStart: int32(i * partitionCountForEachBroker),
+ PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
+ })
+ }
+ }
+
+ // create a new assignment
+ assignment = &mq_pb.TopicPartitionsAssignment{
+ PartitionCount: prevAssignment.PartitionCount,
+ }
+
+ // allocate partitions for each partition range
+ for _, brokerPartition := range prevAssignment.BrokerPartitions {
+ // allocate partitions for each partition range
+ leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return nil, err
+ }
+
+ followerBrokers := make([]string, len(followers))
+ for i, follower := range followers {
+ followerBrokers[i] = string(follower)
+ }
+
+ assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
+ PartitionStart: brokerPartition.PartitionStart,
+ PartitionStop: brokerPartition.PartitionStop,
+ LeaderBroker: string(leader),
+ FollowerBrokers: followerBrokers,
+ })
+ }
+
+ return
+}
+
+func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
+ // allocate leader
+ leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return
+ }
+
+ // allocate followers
+ followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
+ return
+}
+
+func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
+ return
+}