aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-21 09:56:30 -0700
committerchrislu <chris.lu@gmail.com>2024-05-21 09:56:30 -0700
commitd5abffa42c63a302f07eb910bc1d523c5728f8bb (patch)
tree83cbddca1c7133afe055db1347182bc38daceda8
parentfa98ecf71edcdf1a8b63a92f46a473ad1073b889 (diff)
downloadseaweedfs-d5abffa42c63a302f07eb910bc1d523c5728f8bb.tar.xz
seaweedfs-d5abffa42c63a302f07eb910bc1d523c5728f8bb.zip
rename Balancer to PubBalancer
-rw-r--r--weed/mq/broker/broker_server.go2
-rw-r--r--weed/mq/pub_balancer/balance.go4
-rw-r--r--weed/mq/pub_balancer/balance_action.go6
-rw-r--r--weed/mq/pub_balancer/balancer.go18
-rw-r--r--weed/mq/pub_balancer/lookup.go2
-rw-r--r--weed/mq/pub_balancer/repair.go2
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go4
-rw-r--r--weed/mq/sub_coordinator/coordinator.go4
8 files changed, 21 insertions, 21 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 640ccca10..e381fa84c 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -43,7 +43,7 @@ type MessageQueueBroker struct {
filers map[pb.ServerAddress]struct{}
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
- Balancer *pub_balancer.Balancer
+ Balancer *pub_balancer.PubBalancer
lockAsBalancer *cluster.LiveLock
Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex
diff --git a/weed/mq/pub_balancer/balance.go b/weed/mq/pub_balancer/balance.go
index 87fc5739b..b4f1e20cd 100644
--- a/weed/mq/pub_balancer/balance.go
+++ b/weed/mq/pub_balancer/balance.go
@@ -54,12 +54,12 @@ type BalanceActionCreate struct {
// BalancePublishers check the stats of all brokers,
// and balance the publishers to the brokers.
-func (balancer *Balancer) BalancePublishers() []BalanceAction {
+func (balancer *PubBalancer) BalancePublishers() []BalanceAction {
action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
return []BalanceAction{action}
}
-func (balancer *Balancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
+func (balancer *PubBalancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
for _, action := range actions {
switch action.(type) {
case *BalanceActionMove:
diff --git a/weed/mq/pub_balancer/balance_action.go b/weed/mq/pub_balancer/balance_action.go
index c29ec3469..a2d888b2a 100644
--- a/weed/mq/pub_balancer/balance_action.go
+++ b/weed/mq/pub_balancer/balance_action.go
@@ -8,10 +8,10 @@ import (
"google.golang.org/grpc"
)
-// Balancer <= PublisherToPubBalancer() <= Broker <=> Publish()
-// ExecuteBalanceActionMove from Balancer => AssignTopicPartitions() => Broker => Publish()
+// PubBalancer <= PublisherToPubBalancer() <= Broker <=> Publish()
+// ExecuteBalanceActionMove from PubBalancer => AssignTopicPartitions() => Broker => Publish()
-func (balancer *Balancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error {
+func (balancer *PubBalancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error {
if _, found := balancer.Brokers.Get(move.SourceBroker); !found {
return fmt.Errorf("source broker %s not found", move.SourceBroker)
}
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
index 5b5562d2c..1c8e31558 100644
--- a/weed/mq/pub_balancer/balancer.go
+++ b/weed/mq/pub_balancer/balancer.go
@@ -11,7 +11,7 @@ const (
LockBrokerBalancer = "broker_balancer"
)
-// Balancer collects stats from all brokers.
+// PubBalancer collects stats from all brokers.
//
// When publishers wants to create topics, it picks brokers to assign the topic partitions.
// When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions.
@@ -28,7 +28,7 @@ const (
//
// When a consumer instance is down, the broker will notice this and inform the balancer.
// The balancer will then tell the broker to send the partition to another standby consumer instance.
-type Balancer struct {
+type PubBalancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
@@ -37,14 +37,14 @@ type Balancer struct {
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
}
-func NewBalancer() *Balancer {
- return &Balancer{
+func NewBalancer() *PubBalancer {
+ return &PubBalancer{
Brokers: cmap.New[*BrokerStats](),
TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
}
}
-func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
+func (balancer *PubBalancer) AddBroker(broker string) (brokerStats *BrokerStats) {
var found bool
brokerStats, found = balancer.Brokers.Get(broker)
if !found {
@@ -58,7 +58,7 @@ func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
return brokerStats
}
-func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
+func (balancer *PubBalancer) RemoveBroker(broker string, stats *BrokerStats) {
balancer.Brokers.Remove(broker)
// update TopicToBrokers
@@ -78,7 +78,7 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
balancer.OnRemoveBroker(broker, stats)
}
-func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
+func (balancer *PubBalancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
brokerStats.UpdateStats(receivedStats)
// update TopicToBrokers
@@ -97,9 +97,9 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke
}
// OnPubAddBroker is called when a broker is added for a publisher coordinator
-func (balancer *Balancer) onPubAddBroker(broker string, brokerStats *BrokerStats) {
+func (balancer *PubBalancer) onPubAddBroker(broker string, brokerStats *BrokerStats) {
}
// OnPubRemoveBroker is called when a broker is removed for a publisher coordinator
-func (balancer *Balancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) {
+func (balancer *PubBalancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) {
}
diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go
index 052932c04..423b38ecb 100644
--- a/weed/mq/pub_balancer/lookup.go
+++ b/weed/mq/pub_balancer/lookup.go
@@ -9,7 +9,7 @@ var (
ErrNoBroker = errors.New("no broker")
)
-func (balancer *Balancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
+func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
// find existing topic partition assignments
for brokerStatsItem := range balancer.Brokers.IterBuffered() {
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go
index 0f307c9eb..d16715406 100644
--- a/weed/mq/pub_balancer/repair.go
+++ b/weed/mq/pub_balancer/repair.go
@@ -8,7 +8,7 @@ import (
"sort"
)
-func (balancer *Balancer) RepairTopics() []BalanceAction {
+func (balancer *PubBalancer) RepairTopics() []BalanceAction {
action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
return []BalanceAction{action}
}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index 72a8e9aad..1e5c7c79c 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -22,11 +22,11 @@ type ConsumerGroup struct {
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
mapping *PartitionConsumerMapping
reBalanceTimer *time.Timer
- pubBalancer *pub_balancer.Balancer
+ pubBalancer *pub_balancer.PubBalancer
filerClientAccessor *FilerClientAccessor
}
-func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup {
+func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.PubBalancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup {
return &ConsumerGroup{
topic: topic.FromPbTopic(t),
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index 4bb726f26..6ab7166d4 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -18,11 +18,11 @@ type TopicConsumerGroups struct {
type Coordinator struct {
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
- balancer *pub_balancer.Balancer
+ balancer *pub_balancer.PubBalancer
FilerClientAccessor *FilerClientAccessor
}
-func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
+func NewCoordinator(balancer *pub_balancer.PubBalancer) *Coordinator {
return &Coordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,