aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/msgclient/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/msgclient/client.go')
-rw-r--r--weed/mq/msgclient/client.go55
1 files changed, 0 insertions, 55 deletions
diff --git a/weed/mq/msgclient/client.go b/weed/mq/msgclient/client.go
deleted file mode 100644
index cc64f1acb..000000000
--- a/weed/mq/msgclient/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package msgclient
-
-import (
- "context"
- "fmt"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnections map[broker.TopicPartition]*grpc.ClientConn
- grpcDialOption grpc.DialOption
-}
-
-func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
- }
-}
-
-func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
-
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
- &mq_pb.FindBrokerRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Parition: tp.Partition,
- })
- if err != nil {
- return nil, err
- }
-
- targetBroker := resp.Broker
- return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
- }
- return nil, fmt.Errorf("no broker found for %+v", tp)
-}