aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/balancer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/balancer')
-rw-r--r--weed/mq/balancer/allocate.go20
-rw-r--r--weed/mq/balancer/allocate_test.go62
-rw-r--r--weed/mq/balancer/balance.go26
-rw-r--r--weed/mq/balancer/balancer.go4
-rw-r--r--weed/mq/balancer/lookup.go43
5 files changed, 128 insertions, 27 deletions
diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go
new file mode 100644
index 000000000..d594c60fb
--- /dev/null
+++ b/weed/mq/balancer/allocate.go
@@ -0,0 +1,20 @@
+package balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) {
+ return []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:17777",
+ FollowerBrokers: []string{"localhost:17777"},
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: 0,
+ RangeStop: MaxPartitionCount,
+ },
+ },
+ }
+}
diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go
new file mode 100644
index 000000000..c714788e6
--- /dev/null
+++ b/weed/mq/balancer/allocate_test.go
@@ -0,0 +1,62 @@
+package balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "reflect"
+ "testing"
+)
+
+func Test_allocateOneBroker(t *testing.T) {
+ brokers := cmap.New[*BrokerStats]()
+ brokers.SetIfAbsent("localhost:17777", &BrokerStats{
+ TopicPartitionCount: 0,
+ ConsumerCount: 0,
+ CpuUsagePercent: 0,
+ })
+
+ tests := []struct {
+ name string
+ args args
+ wantAssignments []*mq_pb.BrokerPartitionAssignment
+ }{
+ {
+ name: "test only one broker",
+ args: args{
+ brokers: brokers,
+ partitionCount: 6,
+ },
+ wantAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:17777",
+ FollowerBrokers: []string{"localhost:17777"},
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: 0,
+ RangeStop: MaxPartitionCount,
+ },
+ },
+ },
+ },
+ }
+ testThem(t, tests)
+}
+
+type args struct {
+ brokers cmap.ConcurrentMap[string, *BrokerStats]
+ partitionCount int
+}
+
+func testThem(t *testing.T, tests []struct {
+ name string
+ args args
+ wantAssignments []*mq_pb.BrokerPartitionAssignment
+}) {
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) {
+ t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments)
+ }
+ })
+ }
+}
diff --git a/weed/mq/balancer/balance.go b/weed/mq/balancer/balance.go
deleted file mode 100644
index 5be968399..000000000
--- a/weed/mq/balancer/balance.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package balancer
-
-import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
-
-func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) ([]*mq_pb.BrokerPartitionAssignment, error) {
- // TODO lock the topic
-
- // find the topic partitions on the filer
- // if the topic is not found
- // if the request is_for_publish
- // create the topic
- // if the request is_for_subscribe
- // return error not found
- // t := topic.FromPbTopic(request.Topic)
- return []*mq_pb.BrokerPartitionAssignment{
- {
- LeaderBroker: "localhost:17777",
- FollowerBrokers: []string{"localhost:17777"},
- Partition: &mq_pb.Partition{
- RingSize: MaxPartitionCount,
- RangeStart: 0,
- RangeStop: MaxPartitionCount,
- },
- },
- }, nil
-}
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 1aa47831e..837dc0ce3 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -7,7 +7,7 @@ import (
)
const (
- MaxPartitionCount = 1024
+ MaxPartitionCount = 8 * 9 * 5 * 7 //2520
)
type Balancer struct {
@@ -36,6 +36,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
RangeStop: topicPartitionStats.Partition.RangeStop,
},
ConsumerCount: topicPartitionStats.ConsumerCount,
+ IsLeader: topicPartitionStats.IsLeader,
}
consumerCount += topicPartitionStats.ConsumerCount
key := tps.TopicPartition.String()
@@ -60,6 +61,7 @@ type TopicPartition struct {
type TopicPartitionStats struct {
TopicPartition
ConsumerCount int32
+ IsLeader bool
}
func NewBalancer() *Balancer {
diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go
new file mode 100644
index 000000000..55ed3e95d
--- /dev/null
+++ b/weed/mq/balancer/lookup.go
@@ -0,0 +1,43 @@
+package balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
+ // find existing topic partition assignments
+ for brokerStatsItem := range b.Brokers.IterBuffered() {
+ broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
+ for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
+ topicPartitionStat := topicPartitionStatsItem.Val
+ if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
+ topicPartitionStat.TopicPartition.Topic == topic.Name {
+ assignment := &mq_pb.BrokerPartitionAssignment{
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: topicPartitionStat.RangeStart,
+ RangeStop: topicPartitionStat.RangeStop,
+ },
+ }
+ if topicPartitionStat.IsLeader {
+ assignment.LeaderBroker = broker
+ } else {
+ assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker)
+ }
+ assignments = append(assignments, assignment)
+ }
+ }
+ }
+ if len(assignments) > 0 {
+ return assignments, nil
+ }
+
+ // find the topic partitions on the filer
+ // if the topic is not found
+ // if the request is_for_publish
+ // create the topic
+ // if the request is_for_subscribe
+ // return error not found
+ // t := topic.FromPbTopic(request.Topic)
+ return allocateTopicPartitions(b.Brokers, 6), nil
+}