aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/connect.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/connect.go')
-rw-r--r--weed/mq/client/pub_client/connect.go73
1 files changed, 73 insertions, 0 deletions
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go
new file mode 100644
index 000000000..fc7ff4d77
--- /dev/null
+++ b/weed/mq/client/pub_client/connect.go
@@ -0,0 +1,73 @@
+package pub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "log"
+)
+
+// broker => publish client
+// send init message
+// save the publishing client
+func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) {
+ log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition)
+
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
+ if err != nil {
+ return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ stream, err := brokerClient.Publish(context.Background())
+ if err != nil {
+ return publishClient, fmt.Errorf("create publish client: %v", err)
+ }
+ publishClient = &PublishClient{
+ SeaweedMessaging_PublishClient: stream,
+ Broker: brokerAddress,
+ }
+ if err = publishClient.Send(&mq_pb.PublishRequest{
+ Message: &mq_pb.PublishRequest_Init{
+ Init: &mq_pb.PublishRequest_InitMessage{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: partition.RingSize,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ },
+ AckInterval: 128,
+ },
+ },
+ }); err != nil {
+ return publishClient, fmt.Errorf("send init message: %v", err)
+ }
+ resp, err := stream.Recv()
+ if err != nil {
+ return publishClient, fmt.Errorf("recv init response: %v", err)
+ }
+ if resp.Error != "" {
+ return publishClient, fmt.Errorf("init response error: %v", resp.Error)
+ }
+
+ go func() {
+ for {
+ _, err := publishClient.Recv()
+ if err != nil {
+ e, ok := status.FromError(err)
+ if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
+ return
+ }
+ publishClient.Err = err
+ fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
+ return
+ }
+ }
+ }()
+ return publishClient, nil
+}