aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go38
1 files changed, 24 insertions, 14 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index e8238a5f7..3b585f6f6 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -149,26 +149,17 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
+ self := b.option.BrokerAddress()
+ glog.V(0).Infof("broker %s load topic %v partition %v", self, t, p)
+
// load local topic partition from configuration on filer if not found
var conf *mq_pb.ConfigureTopicResponse
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
- if err != nil {
- return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err)
- }
- // parse into filer conf object
- conf = &mq_pb.ConfigureTopicResponse{}
- if err = jsonpb.Unmarshal(data, conf); err != nil {
- return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err)
- }
- return nil
- }); err != nil {
+ conf, err = b.readTopicConfFromFiler(t, p)
+ if err != nil {
return nil, err
}
// create local topic partition
- self := b.option.BrokerAddress()
var hasCreated bool
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) {
@@ -186,6 +177,25 @@ func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p t
return localTopicPartition, nil
}
+func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic, p topic.Partition) (conf *mq_pb.ConfigureTopicResponse, err error) {
+ topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+ if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
+ if err != nil {
+ return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err)
+ }
+ // parse into filer conf object
+ conf = &mq_pb.ConfigureTopicResponse{}
+ if err = jsonpb.Unmarshal(data, conf); err != nil {
+ return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err)
+ }
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ return conf, err
+}
+
// duplicated from master_grpc_server.go
func findClientAddress(ctx context.Context) string {
// fmt.Printf("FromContext %+v\n", ctx)