diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-01 22:43:25 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-28 23:22:06 -0700 |
| commit | 21b6b07dd8d0379d835f9d9c1259155a12f1e61b (patch) | |
| tree | c3b13d69cac50afc227b1a06d34082cf3598f98a /weed/messaging/msgclient/client.go | |
| parent | 8c4edf7b4014b157ee269419febe57af9cd67618 (diff) | |
| download | seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.tar.xz seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.zip | |
renaming
Diffstat (limited to 'weed/messaging/msgclient/client.go')
| -rw-r--r-- | weed/messaging/msgclient/client.go | 55 |
1 files changed, 0 insertions, 55 deletions
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go deleted file mode 100644 index 4d7ef2b8e..000000000 --- a/weed/messaging/msgclient/client.go +++ /dev/null @@ -1,55 +0,0 @@ -package msgclient - -import ( - "context" - "fmt" - "log" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type MessagingClient struct { - bootstrapBrokers []string - grpcConnections map[broker.TopicPartition]*grpc.ClientConn - grpcDialOption grpc.DialOption -} - -func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { - return &MessagingClient{ - bootstrapBrokers: bootstrapBrokers, - grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), - } -} - -func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { - - for _, broker := range mc.bootstrapBrokers { - grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) - if err != nil { - log.Printf("dial broker %s: %v", broker, err) - continue - } - defer grpcConnection.Close() - - resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), - &messaging_pb.FindBrokerRequest{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Parition: tp.Partition, - }) - if err != nil { - return nil, err - } - - targetBroker := resp.Broker - return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) - } - return nil, fmt.Errorf("no broker found for %+v", tp) -} |
