diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/balancer/allocate.go | 20 | ||||
| -rw-r--r-- | weed/mq/balancer/allocate_test.go | 62 | ||||
| -rw-r--r-- | weed/mq/balancer/balance.go | 26 | ||||
| -rw-r--r-- | weed/mq/balancer/balancer.go | 4 | ||||
| -rw-r--r-- | weed/mq/balancer/lookup.go | 43 | ||||
| -rw-r--r-- | weed/mq/topic_allocation/allocation.go | 81 |
6 files changed, 128 insertions, 108 deletions
diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go new file mode 100644 index 000000000..d594c60fb --- /dev/null +++ b/weed/mq/balancer/allocate.go @@ -0,0 +1,20 @@ +package balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) { + return []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + } +} diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go new file mode 100644 index 000000000..c714788e6 --- /dev/null +++ b/weed/mq/balancer/allocate_test.go @@ -0,0 +1,62 @@ +package balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "reflect" + "testing" +) + +func Test_allocateOneBroker(t *testing.T) { + brokers := cmap.New[*BrokerStats]() + brokers.SetIfAbsent("localhost:17777", &BrokerStats{ + TopicPartitionCount: 0, + ConsumerCount: 0, + CpuUsagePercent: 0, + }) + + tests := []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment + }{ + { + name: "test only one broker", + args: args{ + brokers: brokers, + partitionCount: 6, + }, + wantAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + }, + }, + } + testThem(t, tests) +} + +type args struct { + brokers cmap.ConcurrentMap[string, *BrokerStats] + partitionCount int +} + +func testThem(t *testing.T, tests []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment +}) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) { + t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments) + } + }) + } +} diff --git a/weed/mq/balancer/balance.go b/weed/mq/balancer/balance.go deleted file mode 100644 index 5be968399..000000000 --- a/weed/mq/balancer/balance.go +++ /dev/null @@ -1,26 +0,0 @@ -package balancer - -import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - -func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) ([]*mq_pb.BrokerPartitionAssignment, error) { - // TODO lock the topic - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - // t := topic.FromPbTopic(request.Topic) - return []*mq_pb.BrokerPartitionAssignment{ - { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, - Partition: &mq_pb.Partition{ - RingSize: MaxPartitionCount, - RangeStart: 0, - RangeStop: MaxPartitionCount, - }, - }, - }, nil -} diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 1aa47831e..837dc0ce3 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -7,7 +7,7 @@ import ( ) const ( - MaxPartitionCount = 1024 + MaxPartitionCount = 8 * 9 * 5 * 7 //2520 ) type Balancer struct { @@ -36,6 +36,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { RangeStop: topicPartitionStats.Partition.RangeStop, }, ConsumerCount: topicPartitionStats.ConsumerCount, + IsLeader: topicPartitionStats.IsLeader, } consumerCount += topicPartitionStats.ConsumerCount key := tps.TopicPartition.String() @@ -60,6 +61,7 @@ type TopicPartition struct { type TopicPartitionStats struct { TopicPartition ConsumerCount int32 + IsLeader bool } func NewBalancer() *Balancer { diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go new file mode 100644 index 000000000..55ed3e95d --- /dev/null +++ b/weed/mq/balancer/lookup.go @@ -0,0 +1,43 @@ +package balancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { + // find existing topic partition assignments + for brokerStatsItem := range b.Brokers.IterBuffered() { + broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val + for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { + topicPartitionStat := topicPartitionStatsItem.Val + if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && + topicPartitionStat.TopicPartition.Topic == topic.Name { + assignment := &mq_pb.BrokerPartitionAssignment{ + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: topicPartitionStat.RangeStart, + RangeStop: topicPartitionStat.RangeStop, + }, + } + if topicPartitionStat.IsLeader { + assignment.LeaderBroker = broker + } else { + assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker) + } + assignments = append(assignments, assignment) + } + } + } + if len(assignments) > 0 { + return assignments, nil + } + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + // t := topic.FromPbTopic(request.Topic) + return allocateTopicPartitions(b.Brokers, 6), nil +} diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go deleted file mode 100644 index a07ce4884..000000000 --- a/weed/mq/topic_allocation/allocation.go +++ /dev/null @@ -1,81 +0,0 @@ -package topic_allocation - -import ( - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "modernc.org/mathutil" -) - -const ( - DefaultBrokerCount = 4 -) - -// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions -func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) { - // create a previous assignment if not exists - if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 { - prevAssignment = &mq_pb.TopicPartitionsAssignment{ - PartitionCount: topic.PartitionCount, - } - partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount - for i := 0; i < DefaultBrokerCount; i++ { - prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ - PartitionStart: int32(i * partitionCountForEachBroker), - PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount), - }) - } - } - - // create a new assignment - assignment = &mq_pb.TopicPartitionsAssignment{ - PartitionCount: prevAssignment.PartitionCount, - } - - // allocate partitions for each partition range - for _, brokerPartition := range prevAssignment.BrokerPartitions { - // allocate partitions for each partition range - leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers) - if err != nil { - return nil, err - } - - followerBrokers := make([]string, len(followers)) - for i, follower := range followers { - followerBrokers[i] = string(follower) - } - - assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ - PartitionStart: brokerPartition.PartitionStart, - PartitionStop: brokerPartition.PartitionStop, - LeaderBroker: string(leader), - FollowerBrokers: followerBrokers, - }) - } - - return -} - -func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) { - // allocate leader - leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers) - if err != nil { - return - } - - // allocate followers - followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers) - if err != nil { - return - } - - return -} - -func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) { - return -} - -func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) { - return -} |
