diff options
Diffstat (limited to 'weed/pb/grpc_client_server.go')
| -rw-r--r-- | weed/pb/grpc_client_server.go | 73 |
1 files changed, 44 insertions, 29 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 08fb7fac7..532c66932 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -97,7 +97,8 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return existingConnection, nil } - grpcConnection, err := GrpcDial(context.Background(), address, opts...) + ctx := context.Background() + grpcConnection, err := GrpcDial(ctx, address, opts...) if err != nil { return nil, fmt.Errorf("fail to dial %s: %v", address, err) } @@ -112,28 +113,42 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return vgc, nil } -func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { +// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. +func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - vgc, err := getOrCreateConnection(address, opts...) - if err != nil { - return fmt.Errorf("getOrCreateConnection %s: %v", address, err) - } - executionErr := fn(vgc.ClientConn) - if executionErr != nil { - if strings.Contains(executionErr.Error(), "transport") || - strings.Contains(executionErr.Error(), "connection closed") { - grpcClientsLock.Lock() - if t, ok := grpcClients[address]; ok { - if t.version == vgc.version { - vgc.Close() - delete(grpcClients, address) + if !streamingMode { + vgc, err := getOrCreateConnection(address, opts...) + if err != nil { + return fmt.Errorf("getOrCreateConnection %s: %v", address, err) + } + executionErr := fn(vgc.ClientConn) + if executionErr != nil { + if strings.Contains(executionErr.Error(), "transport") || + strings.Contains(executionErr.Error(), "connection closed") { + grpcClientsLock.Lock() + if t, ok := grpcClients[address]; ok { + if t.version == vgc.version { + vgc.Close() + delete(grpcClients, address) + } } + grpcClientsLock.Unlock() } - grpcClientsLock.Unlock() } + return executionErr + } else { + grpcConnection, err := GrpcDial(context.Background(), address, opts...) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", address, err) + } + defer grpcConnection.Close() + executionErr := fn(grpcConnection) + if executionErr != nil { + return executionErr + } + return nil } - return executionErr } func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { @@ -184,18 +199,18 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { return util.JoinHostPort(host, port) } -func WithMasterClient(master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { +func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, master.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { +func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { for _, masterGrpcAddress := range masterGrpcAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress.ToGrpcAddress(), grpcDialOption) @@ -207,34 +222,34 @@ func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOpt return err } -func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { +func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := messaging_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, brokerGrpcAddress, grpcDialOption) } -func WithFilerClient(filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcFilerClient(filer, grpcDialOption, fn) + return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn) } -func WithGrpcFilerClient(filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcFilerClients(filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { +func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { for _, filerAddress := range filerAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerAddress.ToGrpcAddress(), grpcDialOption) |
