diff options
Diffstat (limited to 'weed/mq/balancer')
| -rw-r--r-- | weed/mq/balancer/allocate.go | 20 | ||||
| -rw-r--r-- | weed/mq/balancer/allocate_test.go | 62 | ||||
| -rw-r--r-- | weed/mq/balancer/balance.go | 26 | ||||
| -rw-r--r-- | weed/mq/balancer/balancer.go | 4 | ||||
| -rw-r--r-- | weed/mq/balancer/lookup.go | 43 |
5 files changed, 128 insertions, 27 deletions
diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go new file mode 100644 index 000000000..d594c60fb --- /dev/null +++ b/weed/mq/balancer/allocate.go @@ -0,0 +1,20 @@ +package balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) { + return []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + } +} diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go new file mode 100644 index 000000000..c714788e6 --- /dev/null +++ b/weed/mq/balancer/allocate_test.go @@ -0,0 +1,62 @@ +package balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "reflect" + "testing" +) + +func Test_allocateOneBroker(t *testing.T) { + brokers := cmap.New[*BrokerStats]() + brokers.SetIfAbsent("localhost:17777", &BrokerStats{ + TopicPartitionCount: 0, + ConsumerCount: 0, + CpuUsagePercent: 0, + }) + + tests := []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment + }{ + { + name: "test only one broker", + args: args{ + brokers: brokers, + partitionCount: 6, + }, + wantAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + }, + }, + } + testThem(t, tests) +} + +type args struct { + brokers cmap.ConcurrentMap[string, *BrokerStats] + partitionCount int +} + +func testThem(t *testing.T, tests []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment +}) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) { + t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments) + } + }) + } +} diff --git a/weed/mq/balancer/balance.go b/weed/mq/balancer/balance.go deleted file mode 100644 index 5be968399..000000000 --- a/weed/mq/balancer/balance.go +++ /dev/null @@ -1,26 +0,0 @@ -package balancer - -import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - -func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) ([]*mq_pb.BrokerPartitionAssignment, error) { - // TODO lock the topic - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - // t := topic.FromPbTopic(request.Topic) - return []*mq_pb.BrokerPartitionAssignment{ - { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, - Partition: &mq_pb.Partition{ - RingSize: MaxPartitionCount, - RangeStart: 0, - RangeStop: MaxPartitionCount, - }, - }, - }, nil -} diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 1aa47831e..837dc0ce3 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -7,7 +7,7 @@ import ( ) const ( - MaxPartitionCount = 1024 + MaxPartitionCount = 8 * 9 * 5 * 7 //2520 ) type Balancer struct { @@ -36,6 +36,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { RangeStop: topicPartitionStats.Partition.RangeStop, }, ConsumerCount: topicPartitionStats.ConsumerCount, + IsLeader: topicPartitionStats.IsLeader, } consumerCount += topicPartitionStats.ConsumerCount key := tps.TopicPartition.String() @@ -60,6 +61,7 @@ type TopicPartition struct { type TopicPartitionStats struct { TopicPartition ConsumerCount int32 + IsLeader bool } func NewBalancer() *Balancer { diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go new file mode 100644 index 000000000..55ed3e95d --- /dev/null +++ b/weed/mq/balancer/lookup.go @@ -0,0 +1,43 @@ +package balancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { + // find existing topic partition assignments + for brokerStatsItem := range b.Brokers.IterBuffered() { + broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val + for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { + topicPartitionStat := topicPartitionStatsItem.Val + if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && + topicPartitionStat.TopicPartition.Topic == topic.Name { + assignment := &mq_pb.BrokerPartitionAssignment{ + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: topicPartitionStat.RangeStart, + RangeStop: topicPartitionStat.RangeStop, + }, + } + if topicPartitionStat.IsLeader { + assignment.LeaderBroker = broker + } else { + assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker) + } + assignments = append(assignments, assignment) + } + } + } + if len(assignments) > 0 { + return assignments, nil + } + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + // t := topic.FromPbTopic(request.Topic) + return allocateTopicPartitions(b.Brokers, 6), nil +} |
