diff options
Diffstat (limited to 'weed/mq/client/pub_client/lookup.go')
| -rw-r--r-- | weed/mq/client/pub_client/lookup.go | 116 |
1 files changed, 42 insertions, 74 deletions
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index 28cb29015..e55bfd256 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -5,11 +5,28 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) -func (p *TopicPublisher) doLookup(brokerAddress string) error { +func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { + if p.config.CreateTopic { + err := pb.WithBrokerGrpcClient(true, + brokerAddress, + p.grpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + PartitionCount: p.config.CreateTopicPartitionCount, + }) + return err + }) + if err != nil { + return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) + } + } + err := pb.WithBrokerGrpcClient(true, brokerAddress, p.grpcDialOption, @@ -22,21 +39,36 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error { }, IsForPublish: true, }) + if p.config.CreateTopic && err != nil { + _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + PartitionCount: p.config.CreateTopicPartitionCount, + }) + if err != nil { + return err + } + lookupResp, err = client.LookupTopicBrokers(context.Background(), + &mq_pb.LookupTopicBrokersRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + IsForPublish: true, + }) + } if err != nil { return err } + for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { // partition => publishClient - publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) + publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) if err != nil { return err } - for redirectTo != "" { - publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo) - if err != nil { - return err - } - } p.partition2Broker.Insert( brokerPartitionAssignment.Partition.RangeStart, brokerPartitionAssignment.Partition.RangeStop, @@ -50,67 +82,3 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error { } return nil } - -// broker => publish client -// send init message -// save the publishing client -func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) { - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) - if err != nil { - return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err) - } - brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - stream, err := brokerClient.Publish(context.Background()) - if err != nil { - return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err) - } - publishClient = &PublishClient{ - SeaweedMessaging_PublishClient: stream, - Broker: brokerAddress, - } - if err = publishClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Init{ - Init: &mq_pb.PublishRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - }, - AckInterval: 128, - }, - }, - }); err != nil { - return publishClient, redirectTo, fmt.Errorf("send init message: %v", err) - } - resp, err := stream.Recv() - if err != nil { - return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err) - } - if resp.Error != "" { - return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error) - } - if resp.RedirectToBroker != "" { - redirectTo = resp.RedirectToBroker - return publishClient, redirectTo, nil - } - - go func() { - for { - _, err := publishClient.Recv() - if err != nil { - e, ok := status.FromError(err) - if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { - return - } - publishClient.Err = err - fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) - return - } - } - }() - return publishClient, redirectTo, nil -} |
