aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer')
-rw-r--r--weed/mq/pub_balancer/allocate.go3
-rw-r--r--weed/mq/pub_balancer/allocate_test.go17
-rw-r--r--weed/mq/pub_balancer/broker_stats.go3
-rw-r--r--weed/mq/pub_balancer/lookup.go5
-rw-r--r--weed/mq/pub_balancer/partition_list_broker.go4
-rw-r--r--weed/mq/pub_balancer/pub_balancer.go3
6 files changed, 20 insertions, 15 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index ae32188ec..46d423b30 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -4,6 +4,7 @@ import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"math/rand"
"time"
)
@@ -14,7 +15,7 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p
rangeSize := MaxPartitionCount / partitionCount
for i := int32(0); i < partitionCount; i++ {
assignment := &mq_pb.BrokerPartitionAssignment{
- Partition: &mq_pb.Partition{
+ Partition: &schema_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: int32(i * rangeSize),
RangeStop: int32((i + 1) * rangeSize),
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index 63692af0f..fc747634e 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -4,6 +4,7 @@ import (
"fmt"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/stretchr/testify/assert"
"testing"
)
@@ -29,7 +30,7 @@ func Test_allocateOneBroker(t *testing.T) {
wantAssignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:17777",
- Partition: &mq_pb.Partition{
+ Partition: &schema_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: 0,
RangeStop: MaxPartitionCount,
@@ -96,7 +97,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "",
- Partition: &mq_pb.Partition{},
+ Partition: &schema_pb.Partition{},
FollowerBroker: "localhost:2",
},
},
@@ -111,7 +112,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &schema_pb.Partition{},
FollowerBroker: "",
},
},
@@ -126,7 +127,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &schema_pb.Partition{},
FollowerBroker: "localhost:200",
},
},
@@ -141,7 +142,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:100",
- Partition: &mq_pb.Partition{},
+ Partition: &schema_pb.Partition{},
FollowerBroker: "localhost:200",
},
},
@@ -156,7 +157,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &schema_pb.Partition{},
FollowerBroker: "localhost:2",
},
},
@@ -171,7 +172,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &schema_pb.Partition{},
},
},
},
@@ -185,7 +186,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &schema_pb.Partition{},
FollowerBroker: "localhost:2",
},
},
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index e72703d5f..da016b7fd 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -5,6 +5,7 @@ import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
type BrokerStats struct {
@@ -65,7 +66,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
bs.SubscriberCount = subscriberCount
}
-func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
+func (bs *BrokerStats) RegisterAssignment(t *schema_pb.Topic, partition *schema_pb.Partition, isAdd bool) {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go
index 423b38ecb..5f9c7f32f 100644
--- a/weed/mq/pub_balancer/lookup.go
+++ b/weed/mq/pub_balancer/lookup.go
@@ -3,13 +3,14 @@ package pub_balancer
import (
"errors"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
var (
ErrNoBroker = errors.New("no broker")
)
-func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
+func (balancer *PubBalancer) LookupTopicPartitions(topic *schema_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
// find existing topic partition assignments
for brokerStatsItem := range balancer.Brokers.IterBuffered() {
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
@@ -18,7 +19,7 @@ func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignme
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
topicPartitionStat.TopicPartition.Name == topic.Name {
assignment := &mq_pb.BrokerPartitionAssignment{
- Partition: &mq_pb.Partition{
+ Partition: &schema_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: topicPartitionStat.RangeStart,
RangeStop: topicPartitionStat.RangeStop,
diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go
index d084cf74e..34bdfd286 100644
--- a/weed/mq/pub_balancer/partition_list_broker.go
+++ b/weed/mq/pub_balancer/partition_list_broker.go
@@ -2,7 +2,7 @@ package pub_balancer
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
type PartitionSlotToBroker struct {
@@ -24,7 +24,7 @@ func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList {
}
}
-func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string, follower string) {
+func (ps *PartitionSlotToBrokerList) AddBroker(partition *schema_pb.Partition, broker string, follower string) {
for _, partitionSlot := range ps.PartitionSlots {
if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
if partitionSlot.AssignedBroker != "" && partitionSlot.AssignedBroker != broker {
diff --git a/weed/mq/pub_balancer/pub_balancer.go b/weed/mq/pub_balancer/pub_balancer.go
index 755cc8f30..9457b76fe 100644
--- a/weed/mq/pub_balancer/pub_balancer.go
+++ b/weed/mq/pub_balancer/pub_balancer.go
@@ -4,6 +4,7 @@ import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
const (
@@ -32,7 +33,7 @@ type PubBalancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
- OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
+ OnPartitionChange func(topic *schema_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
}
func NewPubBalancer() *PubBalancer {