diff options
Diffstat (limited to 'weed/operation/grpc_client.go')
| -rw-r--r-- | weed/operation/grpc_client.go | 73 |
1 files changed, 70 insertions, 3 deletions
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index e7ee2d2ba..c7a8d2ffd 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -3,13 +3,22 @@ package operation import ( "context" "fmt" + "net" + "strconv" + "strings" + "sync" + + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - "strconv" - "strings" +) + +var ( + connectionPool = make(map[string]*sync.Pool) + connectionPoolLock sync.Mutex ) func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error { @@ -38,6 +47,64 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil } +func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) error) error { + tcpAddress, err := toVolumeServerTcpAddress(volumeServer) + if err != nil { + return err + } + + conn := getConnection(tcpAddress) + defer releaseConnection(conn, tcpAddress) + + err = fn(conn) + return err +} + +func getConnection(tcpAddress string) net.Conn { + connectionPoolLock.Lock() + defer connectionPoolLock.Unlock() + + pool, found := connectionPool[tcpAddress] + if !found { + pool = &sync.Pool{New: func() interface{} { + conn, err := net.Dial("tcp", tcpAddress) + if err != nil { + glog.Errorf("failed to connect to %s: %v", tcpAddress, err) + return conn + } + // println("connected", tcpAddress, "=>", conn.LocalAddr().String()) + return conn + }} + connectionPool[tcpAddress] = pool + } + conn := pool.Get().(net.Conn) + // println("get connection", tcpAddress, "=>", conn.LocalAddr().String()) + return conn +} + +func releaseConnection(conn net.Conn, tcpAddress string) { + connectionPoolLock.Lock() + defer connectionPoolLock.Unlock() + + pool, found := connectionPool[tcpAddress] + if !found { + // println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String()) + return + } + pool.Put(conn) + // println("returned connection", tcpAddress, "=>", conn.LocalAddr().String()) +} + +func toVolumeServerTcpAddress(volumeServer string) (grpcAddress string, err error) { + sepIndex := strings.LastIndex(volumeServer, ":") + port, err := strconv.Atoi(volumeServer[sepIndex+1:]) + if err != nil { + glog.Errorf("failed to parse volume server address: %v", volumeServer) + return "", err + } + return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+20000), nil +} + func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error { ctx := context.Background() |
