aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic_allocation/allocation.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-18 18:47:34 -0700
committerchrislu <chris.lu@gmail.com>2023-09-18 18:47:34 -0700
commit27af11f1e80d738ed495dfca9766c913fd67a7ca (patch)
treea4a54f1e1211e273bede5ecc39ed9513316c596d /weed/mq/topic_allocation/allocation.go
parent0bb97709d41b1be4c74f01dcc65aac6d5f88bd16 (diff)
downloadseaweedfs-27af11f1e80d738ed495dfca9766c913fd67a7ca.tar.xz
seaweedfs-27af11f1e80d738ed495dfca9766c913fd67a7ca.zip
Revert "Revert "Merge branch 'master' into sub""
This reverts commit 0bb97709d41b1be4c74f01dcc65aac6d5f88bd16.
Diffstat (limited to 'weed/mq/topic_allocation/allocation.go')
-rw-r--r--weed/mq/topic_allocation/allocation.go81
1 files changed, 0 insertions, 81 deletions
diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go
deleted file mode 100644
index a07ce4884..000000000
--- a/weed/mq/topic_allocation/allocation.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package topic_allocation
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "modernc.org/mathutil"
-)
-
-const (
- DefaultBrokerCount = 4
-)
-
-// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
-func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
- // create a previous assignment if not exists
- if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
- prevAssignment = &mq_pb.TopicPartitionsAssignment{
- PartitionCount: topic.PartitionCount,
- }
- partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
- for i := 0; i < DefaultBrokerCount; i++ {
- prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
- PartitionStart: int32(i * partitionCountForEachBroker),
- PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
- })
- }
- }
-
- // create a new assignment
- assignment = &mq_pb.TopicPartitionsAssignment{
- PartitionCount: prevAssignment.PartitionCount,
- }
-
- // allocate partitions for each partition range
- for _, brokerPartition := range prevAssignment.BrokerPartitions {
- // allocate partitions for each partition range
- leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
- if err != nil {
- return nil, err
- }
-
- followerBrokers := make([]string, len(followers))
- for i, follower := range followers {
- followerBrokers[i] = string(follower)
- }
-
- assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
- PartitionStart: brokerPartition.PartitionStart,
- PartitionStop: brokerPartition.PartitionStop,
- LeaderBroker: string(leader),
- FollowerBrokers: followerBrokers,
- })
- }
-
- return
-}
-
-func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
- // allocate leader
- leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
- if err != nil {
- return
- }
-
- // allocate followers
- followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
- if err != nil {
- return
- }
-
- return
-}
-
-func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
- return
-}
-
-func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
- return
-}