aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/client.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-01 22:43:25 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:22:06 -0700
commit21b6b07dd8d0379d835f9d9c1259155a12f1e61b (patch)
treec3b13d69cac50afc227b1a06d34082cf3598f98a /weed/messaging/msgclient/client.go
parent8c4edf7b4014b157ee269419febe57af9cd67618 (diff)
downloadseaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.tar.xz
seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.zip
renaming
Diffstat (limited to 'weed/messaging/msgclient/client.go')
-rw-r--r--weed/messaging/msgclient/client.go55
1 files changed, 0 insertions, 55 deletions
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go
deleted file mode 100644
index 4d7ef2b8e..000000000
--- a/weed/messaging/msgclient/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package msgclient
-
-import (
- "context"
- "fmt"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnections map[broker.TopicPartition]*grpc.ClientConn
- grpcDialOption grpc.DialOption
-}
-
-func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
- }
-}
-
-func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
-
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
- &messaging_pb.FindBrokerRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Parition: tp.Partition,
- })
- if err != nil {
- return nil, err
- }
-
- targetBroker := resp.Broker
- return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
- }
- return nil, fmt.Errorf("no broker found for %+v", tp)
-}