aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer/allocate.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer/allocate.go')
-rw-r--r--weed/mq/pub_balancer/allocate.go54
1 files changed, 54 insertions, 0 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
new file mode 100644
index 000000000..9b2113162
--- /dev/null
+++ b/weed/mq/pub_balancer/allocate.go
@@ -0,0 +1,54 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "math/rand"
+)
+
+func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) {
+ // divide the ring into partitions
+ rangeSize := MaxPartitionCount / partitionCount
+ for i := int32(0); i < partitionCount; i++ {
+ assignment := &mq_pb.BrokerPartitionAssignment{
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: int32(i * rangeSize),
+ RangeStop: int32((i + 1) * rangeSize),
+ },
+ }
+ if i == partitionCount-1 {
+ assignment.Partition.RangeStop = MaxPartitionCount
+ }
+ assignments = append(assignments, assignment)
+ }
+
+ // pick the brokers
+ pickedBrokers := pickBrokers(brokers, partitionCount)
+
+ // assign the partitions to brokers
+ for i, assignment := range assignments {
+ assignment.LeaderBroker = pickedBrokers[i]
+ }
+ glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
+ return
+}
+
+// for now: randomly pick brokers
+// TODO pick brokers based on the broker stats
+func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string {
+ candidates := make([]string, 0, brokers.Count())
+ for brokerStatsItem := range brokers.IterBuffered() {
+ candidates = append(candidates, brokerStatsItem.Key)
+ }
+ pickedBrokers := make([]string, 0, count)
+ for i := int32(0); i < count; i++ {
+ p := rand.Int() % len(candidates)
+ if p < 0 {
+ p = -p
+ }
+ pickedBrokers = append(pickedBrokers, candidates[p])
+ }
+ return pickedBrokers
+}