aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-21 10:02:07 -0700
committerchrislu <chris.lu@gmail.com>2024-05-21 10:02:07 -0700
commit554ae09f8236206e121c2b229eaa96e0858832ed (patch)
treeccc338d611b3640a4df1ebb6c730c2469e3202ea
parent6ef2d010aadbb69efa5b7164283b51239d30a5e7 (diff)
downloadseaweedfs-554ae09f8236206e121c2b229eaa96e0858832ed.tar.xz
seaweedfs-554ae09f8236206e121c2b229eaa96e0858832ed.zip
rename
-rw-r--r--weed/mq/broker/broker_grpc_assign.go6
-rw-r--r--weed/mq/broker/broker_grpc_balance.go4
-rw-r--r--weed/mq/broker/broker_grpc_configure.go6
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go2
-rw-r--r--weed/mq/broker/broker_grpc_pub_balancer.go6
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go4
-rw-r--r--weed/mq/broker/broker_server.go24
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go2
-rw-r--r--weed/mq/pub_balancer/pub_balancer.go2
-rw-r--r--weed/mq/sub_coordinator/sub_coordinator.go2
10 files changed, 29 insertions, 29 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 352a6f69b..48ec0d5bd 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -79,11 +79,11 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context,
return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
}
}
- brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ brokerStats, found := b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
if !found {
brokerStats = pub_balancer.NewBrokerStats()
- if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
- brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ if !b.PubBalancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
+ brokerStats, _ = b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
}
}
brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go
index 412407211..54634c9d1 100644
--- a/weed/mq/broker/broker_grpc_balance.go
+++ b/weed/mq/broker/broker_grpc_balance.go
@@ -20,8 +20,8 @@ func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.B
ret := &mq_pb.BalanceTopicsResponse{}
- actions := b.Balancer.BalancePublishers()
- err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption)
+ actions := b.PubBalancer.BalancePublishers()
+ err = b.PubBalancer.ExecuteBalanceAction(actions, b.grpcDialOption)
return ret, err
}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 7f8afab03..7222c8359 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -60,10 +60,10 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
}
resp = &mq_pb.ConfigureTopicResponse{}
- if b.Balancer.Brokers.IsEmpty() {
+ if b.PubBalancer.Brokers.IsEmpty() {
return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
}
- resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
+ resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount)
resp.RecordType = request.RecordType
// save the topic configuration on filer
@@ -71,7 +71,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return nil, fmt.Errorf("configure topic: %v", err)
}
- b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
+ b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index da2c64dfc..db62fd88a 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -50,7 +50,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
ret := &mq_pb.ListTopicsResponse{}
knownTopics := make(map[string]struct{})
- for brokerStatsItem := range b.Balancer.Brokers.IterBuffered() {
+ for brokerStatsItem := range b.PubBalancer.Brokers.IterBuffered() {
_, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val
diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go
index b8675caca..5978d2173 100644
--- a/weed/mq/broker/broker_grpc_pub_balancer.go
+++ b/weed/mq/broker/broker_grpc_pub_balancer.go
@@ -22,12 +22,12 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
initMessage := req.GetInit()
var brokerStats *pub_balancer.BrokerStats
if initMessage != nil {
- brokerStats = b.Balancer.AddBroker(initMessage.Broker)
+ brokerStats = b.PubBalancer.AddBroker(initMessage.Broker)
} else {
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
}
defer func() {
- b.Balancer.RemoveBroker(initMessage.Broker, brokerStats)
+ b.PubBalancer.RemoveBroker(initMessage.Broker, brokerStats)
}()
// process stats message
@@ -40,7 +40,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
if receivedStats := req.GetStats(); receivedStats != nil {
- b.Balancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
+ b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
// glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
}
}
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index a1b29f45c..266859c24 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -23,13 +23,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// process init message
initMessage := req.GetInit()
if initMessage != nil {
- cgi = b.Coordinator.AddSubscriber(initMessage)
+ cgi = b.SubCoordinator.AddSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
} else {
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
}
defer func() {
- b.Coordinator.RemoveSubscriber(initMessage)
+ b.SubCoordinator.RemoveSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}()
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 2e449083a..cdf652294 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -43,17 +43,17 @@ type MessageQueueBroker struct {
filers map[pb.ServerAddress]struct{}
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
- Balancer *pub_balancer.PubBalancer
- lockAsBalancer *cluster.LiveLock
- Coordinator *sub_coordinator.SubCoordinator
- accessLock sync.Mutex
+ PubBalancer *pub_balancer.PubBalancer
+ lockAsBalancer *cluster.LiveLock
+ SubCoordinator *sub_coordinator.SubCoordinator
+ accessLock sync.Mutex
fca *sub_coordinator.FilerClientAccessor
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
- pub_broker_balancer := pub_balancer.NewBalancer()
- coordinator := sub_coordinator.NewCoordinator(pub_broker_balancer)
+ pubBalancer := pub_balancer.NewPubBalancer()
+ subCoordinator := sub_coordinator.NewSubCoordinator(pubBalancer)
mqBroker = &MessageQueueBroker{
option: option,
@@ -61,20 +61,20 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
- Balancer: pub_broker_balancer,
- Coordinator: coordinator,
+ PubBalancer: pubBalancer,
+ SubCoordinator: subCoordinator,
}
fca := &sub_coordinator.FilerClientAccessor{
GetFiler: mqBroker.GetFiler,
GetGrpcDialOption: mqBroker.GetGrpcDialOption,
}
mqBroker.fca = fca
- coordinator.FilerClientAccessor = fca
+ subCoordinator.FilerClientAccessor = fca
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
- pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
- pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
- pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker
+ pubBalancer.OnPartitionChange = mqBroker.SubCoordinator.OnPartitionChange
+ pubBalancer.OnAddBroker = mqBroker.SubCoordinator.OnSubAddBroker
+ pubBalancer.OnRemoveBroker = mqBroker.SubCoordinator.OnSubRemoveBroker
go mqBroker.MasterClient.KeepConnectedToMaster()
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 211473dad..ea5cb71b9 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -52,7 +52,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
// also fix assignee broker if invalid
- hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
+ hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments)
if hasChanges {
glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
diff --git a/weed/mq/pub_balancer/pub_balancer.go b/weed/mq/pub_balancer/pub_balancer.go
index 1c8e31558..3499d2233 100644
--- a/weed/mq/pub_balancer/pub_balancer.go
+++ b/weed/mq/pub_balancer/pub_balancer.go
@@ -37,7 +37,7 @@ type PubBalancer struct {
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
}
-func NewBalancer() *PubBalancer {
+func NewPubBalancer() *PubBalancer {
return &PubBalancer{
Brokers: cmap.New[*BrokerStats](),
TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
diff --git a/weed/mq/sub_coordinator/sub_coordinator.go b/weed/mq/sub_coordinator/sub_coordinator.go
index f78e8c849..b03397ad8 100644
--- a/weed/mq/sub_coordinator/sub_coordinator.go
+++ b/weed/mq/sub_coordinator/sub_coordinator.go
@@ -22,7 +22,7 @@ type SubCoordinator struct {
FilerClientAccessor *FilerClientAccessor
}
-func NewCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
+func NewSubCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
return &SubCoordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,