aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/balancer/allocate.go46
-rw-r--r--weed/mq/balancer/allocate_test.go5
-rw-r--r--weed/mq/balancer/lookup.go10
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go4
4 files changed, 52 insertions, 13 deletions
diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go
index d594c60fb..f7b17ab4b 100644
--- a/weed/mq/balancer/allocate.go
+++ b/weed/mq/balancer/allocate.go
@@ -3,18 +3,50 @@ package balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "math/rand"
)
func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) {
- return []*mq_pb.BrokerPartitionAssignment{
- {
- LeaderBroker: "localhost:17777",
- FollowerBrokers: []string{"localhost:17777"},
+ // divide the ring into partitions
+ rangeSize := MaxPartitionCount / partitionCount
+ for i := 0; i < partitionCount; i++ {
+ assignment := &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
- RangeStart: 0,
- RangeStop: MaxPartitionCount,
+ RangeStart: int32(i * rangeSize),
+ RangeStop: int32((i + 1) * rangeSize),
},
- },
+ }
+ if i == partitionCount-1 {
+ assignment.Partition.RangeStop = MaxPartitionCount
+ }
+ assignments = append(assignments, assignment)
}
+
+ // pick the brokers
+ pickedBrokers := pickBrokers(brokers, partitionCount)
+
+ // assign the partitions to brokers
+ for i, assignment := range assignments {
+ assignment.LeaderBroker = pickedBrokers[i]
+ }
+ return
+}
+
+// for now: randomly pick brokers
+// TODO pick brokers based on the broker stats
+func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int) []string {
+ candidates := make([]string, 0, brokers.Count())
+ for brokerStatsItem := range brokers.IterBuffered() {
+ candidates = append(candidates, brokerStatsItem.Key)
+ }
+ pickedBrokers := make([]string, 0, count)
+ for i := 0; i < count; i++ {
+ p := rand.Int() % len(candidates)
+ if p < 0 {
+ p = -p
+ }
+ pickedBrokers = append(pickedBrokers, candidates[p])
+ }
+ return pickedBrokers
}
diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go
index c714788e6..cb87d4ec3 100644
--- a/weed/mq/balancer/allocate_test.go
+++ b/weed/mq/balancer/allocate_test.go
@@ -24,12 +24,11 @@ func Test_allocateOneBroker(t *testing.T) {
name: "test only one broker",
args: args{
brokers: brokers,
- partitionCount: 6,
+ partitionCount: 1,
},
wantAssignments: []*mq_pb.BrokerPartitionAssignment{
{
- LeaderBroker: "localhost:17777",
- FollowerBrokers: []string{"localhost:17777"},
+ LeaderBroker: "localhost:17777",
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: 0,
diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go
index 55ed3e95d..f906a7d74 100644
--- a/weed/mq/balancer/lookup.go
+++ b/weed/mq/balancer/lookup.go
@@ -1,10 +1,15 @@
package balancer
import (
+ "errors"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
-func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
+var (
+ ErrNoBroker = errors.New("no broker")
+)
+
+func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
// find existing topic partition assignments
for brokerStatsItem := range b.Brokers.IterBuffered() {
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
@@ -39,5 +44,8 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b
// if the request is_for_subscribe
// return error not found
// t := topic.FromPbTopic(request.Topic)
+ if b.Brokers.IsEmpty() {
+ return nil, ErrNoBroker
+ }
return allocateTopicPartitions(b.Brokers, 6), nil
}
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 9dda9b7d1..5bc7068b2 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -23,7 +23,7 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p
}
ret := &mq_pb.CreateTopicResponse{}
- ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true)
+ ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
return ret, err
}
@@ -56,7 +56,7 @@ func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, reques
ret := &mq_pb.LookupTopicBrokersResponse{}
ret.Topic = request.Topic
- ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish)
+ ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6)
return ret, err
}