diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-28 22:03:13 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-28 22:03:13 -0800 |
| commit | 545d5fbdf6308512cfc3833cdba8539859d496c4 (patch) | |
| tree | 7bd470b8a8b50ee04071a07dd2779f29ef2aa740 | |
| parent | a507069d19acaa3f5040e4608178ce073e0a1791 (diff) | |
| download | seaweedfs-545d5fbdf6308512cfc3833cdba8539859d496c4.tar.xz seaweedfs-545d5fbdf6308512cfc3833cdba8539859d496c4.zip | |
unused code
| -rw-r--r-- | weed/mq/client/pub_client/connect.go | 71 |
1 files changed, 0 insertions, 71 deletions
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go deleted file mode 100644 index 045c9593c..000000000 --- a/weed/mq/client/pub_client/connect.go +++ /dev/null @@ -1,71 +0,0 @@ -package pub_client - -import ( - "context" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "log" -) - -// broker => publish client -// send init message -// save the publishing client -func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) { - log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition) - - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) - if err != nil { - return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err) - } - brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - stream, err := brokerClient.PublishMessage(context.Background()) - if err != nil { - return publishClient, fmt.Errorf("create publish client: %v", err) - } - publishClient = &PublishClient{ - SeaweedMessaging_PublishMessageClient: stream, - Broker: brokerAddress, - } - if err = publishClient.Send(&mq_pb.PublishMessageRequest{ - Message: &mq_pb.PublishMessageRequest_Init{ - Init: &mq_pb.PublishMessageRequest_InitMessage{ - Topic: p.config.Topic.ToPbTopic(), - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - UnixTimeNs: partition.UnixTimeNs, - }, - AckInterval: 128, - }, - }, - }); err != nil { - return publishClient, fmt.Errorf("send init message: %v", err) - } - resp, err := stream.Recv() - if err != nil { - return publishClient, fmt.Errorf("recv init response: %v", err) - } - if resp.Error != "" { - return publishClient, fmt.Errorf("init response error: %v", resp.Error) - } - - go func() { - for { - _, err := publishClient.Recv() - if err != nil { - e, ok := status.FromError(err) - if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { - return - } - publishClient.Err = err - fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) - return - } - } - }() - return publishClient, nil -} |
