diff options
Diffstat (limited to 'weed/mq/msgclient/client.go')
| -rw-r--r-- | weed/mq/msgclient/client.go | 55 |
1 files changed, 0 insertions, 55 deletions
diff --git a/weed/mq/msgclient/client.go b/weed/mq/msgclient/client.go deleted file mode 100644 index cc64f1acb..000000000 --- a/weed/mq/msgclient/client.go +++ /dev/null @@ -1,55 +0,0 @@ -package msgclient - -import ( - "context" - "fmt" - "log" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type MessagingClient struct { - bootstrapBrokers []string - grpcConnections map[broker.TopicPartition]*grpc.ClientConn - grpcDialOption grpc.DialOption -} - -func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { - return &MessagingClient{ - bootstrapBrokers: bootstrapBrokers, - grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), - } -} - -func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { - - for _, broker := range mc.bootstrapBrokers { - grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) - if err != nil { - log.Printf("dial broker %s: %v", broker, err) - continue - } - defer grpcConnection.Close() - - resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), - &mq_pb.FindBrokerRequest{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Parition: tp.Partition, - }) - if err != nil { - return nil, err - } - - targetBroker := resp.Broker - return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) - } - return nil, fmt.Errorf("no broker found for %+v", tp) -} |
