aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-14 08:50:17 -0700
committerchrislu <chris.lu@gmail.com>2024-05-14 08:50:17 -0700
commit6e5075e14eba59ffdfd7640cb664b2c4da017221 (patch)
treecc14ca14a007e9bd66fafe186d702c8822995393
parentac63f2b5a1d887cbc71c7017c85c3cd6ab964706 (diff)
downloadseaweedfs-6e5075e14eba59ffdfd7640cb664b2c4da017221.tar.xz
seaweedfs-6e5075e14eba59ffdfd7640cb664b2c4da017221.zip
move read write topic config into filer client accessor
-rw-r--r--weed/mq/broker/broker_grpc_configure.go4
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go2
-rw-r--r--weed/mq/broker/broker_server.go8
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go52
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go4
-rw-r--r--weed/mq/sub_coordinator/coordinator.go3
-rw-r--r--weed/mq/sub_coordinator/filer_client_accessor.go65
7 files changed, 83 insertions, 55 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 40ac8df23..b177786f5 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -38,7 +38,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
t := topic.FromPbTopic(request.Topic)
var readErr, assignErr error
- resp, readErr = b.readTopicConfFromFiler(t)
+ resp, readErr = b.fca.ReadTopicConfFromFiler(t)
if readErr != nil {
glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr)
}
@@ -68,7 +68,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
resp.RecordType = request.RecordType
// save the topic configuration on filer
- if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil {
+ if err := b.fca.SaveTopicConfToFiler(request.Topic, resp); err != nil {
return nil, fmt.Errorf("configure topic: %v", err)
}
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 14c1f37da..da2c64dfc 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -26,7 +26,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
ret := &mq_pb.LookupTopicBrokersResponse{}
conf := &mq_pb.ConfigureTopicResponse{}
ret.Topic = request.Topic
- if conf, err = b.readTopicConfFromFiler(t); err != nil {
+ if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
} else {
err = b.ensureTopicActiveAssignments(t, conf)
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 9c321744b..510c03558 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -47,6 +47,7 @@ type MessageQueueBroker struct {
lockAsBalancer *cluster.LiveLock
Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex
+ fca *sub_coordinator.FilerClientAccessor
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -63,6 +64,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
Balancer: pub_broker_balancer,
Coordinator: coordinator,
}
+ fca := &sub_coordinator.FilerClientAccessor{
+ GetFilerFn: mqBroker.GetFiler,
+ GrpcDialOption: grpcDialOption,
+ }
+ mqBroker.fca = fca
+ coordinator.FilerClientAccessor = fca
+
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index cddd6cf1c..211473dad 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -1,64 +1,16 @@
package broker
import (
- "bytes"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- jsonpb "google.golang.org/protobuf/encoding/protojson"
)
-func (b *MessageQueueBroker) saveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error {
-
- glog.V(0).Infof("save conf for topic %v to filer", t)
-
- // save the topic configuration on filer
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- if err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- var buf bytes.Buffer
- filer.ProtoToText(&buf, conf)
- return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes())
- }); err != nil {
- return fmt.Errorf("save topic to %s: %v", topicDir, err)
- }
- return nil
-}
-
-// readTopicConfFromFiler reads the topic configuration from filer
-// this should only be run in broker leader, to ensure correct active broker list.
-func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
-
- glog.V(0).Infof("load conf for topic %v from filer", t)
-
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
- if err == filer_pb.ErrNotFound {
- return err
- }
- if err != nil {
- return fmt.Errorf("read topic.conf of %v: %v", t, err)
- }
- // parse into filer conf object
- conf = &mq_pb.ConfigureTopicResponse{}
- if err = jsonpb.Unmarshal(data, conf); err != nil {
- return fmt.Errorf("unmarshal topic %v conf: %v", t, err)
- }
- return nil
- }); err != nil {
- return nil, err
- }
-
- return conf, nil
-}
-
func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) {
// get or generate a local partition
- conf, readConfErr := b.readTopicConfFromFiler(t)
+ conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
if readConfErr != nil {
glog.Errorf("topic %v not found: %v", t, readConfErr)
return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
@@ -103,7 +55,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m
hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
if hasChanges {
glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
- if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
+ if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
return err
}
}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index 020f459b6..be87b4105 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -23,14 +23,16 @@ type ConsumerGroup struct {
mapping *PartitionConsumerMapping
reBalanceTimer *time.Timer
pubBalancer *pub_balancer.Balancer
+ filerClientAccessor *FilerClientAccessor
}
-func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup {
+func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup {
return &ConsumerGroup{
topic: topic.FromPbTopic(t),
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
pubBalancer: pubBalancer,
+ filerClientAccessor: filerClientAccessor,
}
}
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index 62c44fd48..a27391da6 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -19,6 +19,7 @@ type Coordinator struct {
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
balancer *pub_balancer.Balancer
+ FilerClientAccessor *FilerClientAccessor
}
func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
@@ -55,7 +56,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato
tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
if cg == nil {
- cg = NewConsumerGroup(initMessage.Topic, c.balancer)
+ cg = NewConsumerGroup(initMessage.Topic, c.balancer, c.FilerClientAccessor)
if !tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg) {
cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
}
diff --git a/weed/mq/sub_coordinator/filer_client_accessor.go b/weed/mq/sub_coordinator/filer_client_accessor.go
new file mode 100644
index 000000000..85bb5e29d
--- /dev/null
+++ b/weed/mq/sub_coordinator/filer_client_accessor.go
@@ -0,0 +1,65 @@
+package sub_coordinator
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc"
+ jsonpb "google.golang.org/protobuf/encoding/protojson"
+)
+
+type FilerClientAccessor struct {
+ GetFiler func() pb.ServerAddress
+ GetGrpcDialOption func()grpc.DialOption
+}
+
+func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn)
+}
+
+func (fca *FilerClientAccessor) SaveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error {
+
+ glog.V(0).Infof("save conf for topic %v to filer", t)
+
+ // save the topic configuration on filer
+ topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+ if err := fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ var buf bytes.Buffer
+ filer.ProtoToText(&buf, conf)
+ return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes())
+ }); err != nil {
+ return fmt.Errorf("save topic to %s: %v", topicDir, err)
+ }
+ return nil
+}
+
+func (fca *FilerClientAccessor) ReadTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
+
+ glog.V(0).Infof("load conf for topic %v from filer", t)
+
+ topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+ if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
+ if err == filer_pb.ErrNotFound {
+ return err
+ }
+ if err != nil {
+ return fmt.Errorf("read topic.conf of %v: %v", t, err)
+ }
+ // parse into filer conf object
+ conf = &mq_pb.ConfigureTopicResponse{}
+ if err = jsonpb.Unmarshal(data, conf); err != nil {
+ return fmt.Errorf("unmarshal topic %v conf: %v", t, err)
+ }
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+
+ return conf, nil
+}