diff options
Diffstat (limited to 'weed/mq/pub_balancer')
| -rw-r--r-- | weed/mq/pub_balancer/allocate.go | 3 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/allocate_test.go | 17 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/broker_stats.go | 3 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/lookup.go | 5 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/partition_list_broker.go | 4 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/pub_balancer.go | 3 |
6 files changed, 20 insertions, 15 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index ae32188ec..46d423b30 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -4,6 +4,7 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "math/rand" "time" ) @@ -14,7 +15,7 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p rangeSize := MaxPartitionCount / partitionCount for i := int32(0); i < partitionCount; i++ { assignment := &mq_pb.BrokerPartitionAssignment{ - Partition: &mq_pb.Partition{ + Partition: &schema_pb.Partition{ RingSize: MaxPartitionCount, RangeStart: int32(i * rangeSize), RangeStop: int32((i + 1) * rangeSize), diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index 63692af0f..fc747634e 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -4,6 +4,7 @@ import ( "fmt" cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/stretchr/testify/assert" "testing" ) @@ -29,7 +30,7 @@ func Test_allocateOneBroker(t *testing.T) { wantAssignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:17777", - Partition: &mq_pb.Partition{ + Partition: &schema_pb.Partition{ RingSize: MaxPartitionCount, RangeStart: 0, RangeStop: MaxPartitionCount, @@ -96,7 +97,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "", - Partition: &mq_pb.Partition{}, + Partition: &schema_pb.Partition{}, FollowerBroker: "localhost:2", }, }, @@ -111,7 +112,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &schema_pb.Partition{}, FollowerBroker: "", }, }, @@ -126,7 +127,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &schema_pb.Partition{}, FollowerBroker: "localhost:200", }, }, @@ -141,7 +142,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:100", - Partition: &mq_pb.Partition{}, + Partition: &schema_pb.Partition{}, FollowerBroker: "localhost:200", }, }, @@ -156,7 +157,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &schema_pb.Partition{}, FollowerBroker: "localhost:2", }, }, @@ -171,7 +172,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &schema_pb.Partition{}, }, }, }, @@ -185,7 +186,7 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &schema_pb.Partition{}, FollowerBroker: "localhost:2", }, }, diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index e72703d5f..da016b7fd 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -5,6 +5,7 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) type BrokerStats struct { @@ -65,7 +66,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { bs.SubscriberCount = subscriberCount } -func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) { +func (bs *BrokerStats) RegisterAssignment(t *schema_pb.Topic, partition *schema_pb.Partition, isAdd bool) { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 423b38ecb..5f9c7f32f 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -3,13 +3,14 @@ package pub_balancer import ( "errors" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) var ( ErrNoBroker = errors.New("no broker") ) -func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) { +func (balancer *PubBalancer) LookupTopicPartitions(topic *schema_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) { // find existing topic partition assignments for brokerStatsItem := range balancer.Brokers.IterBuffered() { broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val @@ -18,7 +19,7 @@ func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignme if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && topicPartitionStat.TopicPartition.Name == topic.Name { assignment := &mq_pb.BrokerPartitionAssignment{ - Partition: &mq_pb.Partition{ + Partition: &schema_pb.Partition{ RingSize: MaxPartitionCount, RangeStart: topicPartitionStat.RangeStart, RangeStop: topicPartitionStat.RangeStop, diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go index d084cf74e..34bdfd286 100644 --- a/weed/mq/pub_balancer/partition_list_broker.go +++ b/weed/mq/pub_balancer/partition_list_broker.go @@ -2,7 +2,7 @@ package pub_balancer import ( "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) type PartitionSlotToBroker struct { @@ -24,7 +24,7 @@ func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList { } } -func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string, follower string) { +func (ps *PartitionSlotToBrokerList) AddBroker(partition *schema_pb.Partition, broker string, follower string) { for _, partitionSlot := range ps.PartitionSlots { if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop { if partitionSlot.AssignedBroker != "" && partitionSlot.AssignedBroker != broker { diff --git a/weed/mq/pub_balancer/pub_balancer.go b/weed/mq/pub_balancer/pub_balancer.go index 755cc8f30..9457b76fe 100644 --- a/weed/mq/pub_balancer/pub_balancer.go +++ b/weed/mq/pub_balancer/pub_balancer.go @@ -4,6 +4,7 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) const ( @@ -32,7 +33,7 @@ type PubBalancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address // Collected from all brokers when they connect to the broker leader TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name - OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) + OnPartitionChange func(topic *schema_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) } func NewPubBalancer() *PubBalancer { |
