aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-14 09:09:36 -0700
committerchrislu <chris.lu@gmail.com>2024-05-14 09:09:36 -0700
commit972e9faaa2f6fac677b79bcec1ba17321c267b37 (patch)
treee28d3c7354d87b1f9e2f87a26f959f69afb153c0
parent6e5075e14eba59ffdfd7640cb664b2c4da017221 (diff)
downloadseaweedfs-972e9faaa2f6fac677b79bcec1ba17321c267b37.tar.xz
seaweedfs-972e9faaa2f6fac677b79bcec1ba17321c267b37.zip
move initial assignment to rebalance logic
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go13
-rw-r--r--weed/mq/broker/broker_server.go9
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go11
3 files changed, 14 insertions, 19 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 6925baa9e..a1b29f45c 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -4,7 +4,6 @@ import (
"context"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -37,18 +36,6 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
ctx := stream.Context()
go func() {
- // try to load the partition assignment from filer
- if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil {
- // send partition assignment to subscriber
- cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
- Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
- Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
- PartitionAssignments: conf.BrokerPartitionAssignments,
- },
- },
- }
- }
-
// process ack messages
for {
_, err := stream.Recv()
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 510c03558..b3d86a401 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -65,8 +65,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
Coordinator: coordinator,
}
fca := &sub_coordinator.FilerClientAccessor{
- GetFilerFn: mqBroker.GetFiler,
- GrpcDialOption: grpcDialOption,
+ GetFiler: mqBroker.GetFiler,
+ GetGrpcDialOption: mqBroker.GetGrpcDialOption,
}
mqBroker.fca = fca
coordinator.FilerClientAccessor = fca
@@ -126,6 +126,11 @@ func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate,
}
+func (b *MessageQueueBroker) GetGrpcDialOption() grpc.DialOption {
+ return b.grpcDialOption
+}
+
+
func (b *MessageQueueBroker) GetFiler() pb.ServerAddress {
return b.currentFiler
}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index be87b4105..d298c0c41 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -77,10 +77,13 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke
// collect current topic partitions
partitionSlotToBrokerList := knownPartitionSlotToBrokerList
if partitionSlotToBrokerList == nil {
- var found bool
- partitionSlotToBrokerList, found = cg.pubBalancer.TopicToBrokers.Get(cg.topic.String())
- if !found {
- glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
+ if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
+ partitionSlotToBrokerList = pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
+ for _, assignment := range conf.BrokerPartitionAssignments {
+ partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker)
+ }
+ } else {
+ glog.V(0).Infof("fail to read topic conf from filer: %v", err)
return
}
}