aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/pub_client/connect.go71
1 files changed, 0 insertions, 71 deletions
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go
deleted file mode 100644
index 045c9593c..000000000
--- a/weed/mq/client/pub_client/connect.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package pub_client
-
-import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "log"
-)
-
-// broker => publish client
-// send init message
-// save the publishing client
-func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) {
- log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition)
-
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
- if err != nil {
- return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
- }
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.PublishMessage(context.Background())
- if err != nil {
- return publishClient, fmt.Errorf("create publish client: %v", err)
- }
- publishClient = &PublishClient{
- SeaweedMessaging_PublishMessageClient: stream,
- Broker: brokerAddress,
- }
- if err = publishClient.Send(&mq_pb.PublishMessageRequest{
- Message: &mq_pb.PublishMessageRequest_Init{
- Init: &mq_pb.PublishMessageRequest_InitMessage{
- Topic: p.config.Topic.ToPbTopic(),
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- UnixTimeNs: partition.UnixTimeNs,
- },
- AckInterval: 128,
- },
- },
- }); err != nil {
- return publishClient, fmt.Errorf("send init message: %v", err)
- }
- resp, err := stream.Recv()
- if err != nil {
- return publishClient, fmt.Errorf("recv init response: %v", err)
- }
- if resp.Error != "" {
- return publishClient, fmt.Errorf("init response error: %v", resp.Error)
- }
-
- go func() {
- for {
- _, err := publishClient.Recv()
- if err != nil {
- e, ok := status.FromError(err)
- if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
- return
- }
- publishClient.Err = err
- fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
- return
- }
- }
- }()
- return publishClient, nil
-}