diff options
| author | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
| commit | 9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch) | |
| tree | 1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/pb/grpc_client_server.go | |
| parent | c935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff) | |
| download | seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip | |
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/pb/grpc_client_server.go')
| -rw-r--r-- | weed/pb/grpc_client_server.go | 73 |
1 files changed, 44 insertions, 29 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 08fb7fac7..532c66932 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -97,7 +97,8 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return existingConnection, nil } - grpcConnection, err := GrpcDial(context.Background(), address, opts...) + ctx := context.Background() + grpcConnection, err := GrpcDial(ctx, address, opts...) if err != nil { return nil, fmt.Errorf("fail to dial %s: %v", address, err) } @@ -112,28 +113,42 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return vgc, nil } -func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { +// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. +func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - vgc, err := getOrCreateConnection(address, opts...) - if err != nil { - return fmt.Errorf("getOrCreateConnection %s: %v", address, err) - } - executionErr := fn(vgc.ClientConn) - if executionErr != nil { - if strings.Contains(executionErr.Error(), "transport") || - strings.Contains(executionErr.Error(), "connection closed") { - grpcClientsLock.Lock() - if t, ok := grpcClients[address]; ok { - if t.version == vgc.version { - vgc.Close() - delete(grpcClients, address) + if !streamingMode { + vgc, err := getOrCreateConnection(address, opts...) + if err != nil { + return fmt.Errorf("getOrCreateConnection %s: %v", address, err) + } + executionErr := fn(vgc.ClientConn) + if executionErr != nil { + if strings.Contains(executionErr.Error(), "transport") || + strings.Contains(executionErr.Error(), "connection closed") { + grpcClientsLock.Lock() + if t, ok := grpcClients[address]; ok { + if t.version == vgc.version { + vgc.Close() + delete(grpcClients, address) + } } + grpcClientsLock.Unlock() } - grpcClientsLock.Unlock() } + return executionErr + } else { + grpcConnection, err := GrpcDial(context.Background(), address, opts...) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", address, err) + } + defer grpcConnection.Close() + executionErr := fn(grpcConnection) + if executionErr != nil { + return executionErr + } + return nil } - return executionErr } func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { @@ -184,18 +199,18 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { return util.JoinHostPort(host, port) } -func WithMasterClient(master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { +func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, master.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { +func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { for _, masterGrpcAddress := range masterGrpcAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress.ToGrpcAddress(), grpcDialOption) @@ -207,34 +222,34 @@ func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOpt return err } -func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { +func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := messaging_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, brokerGrpcAddress, grpcDialOption) } -func WithFilerClient(filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcFilerClient(filer, grpcDialOption, fn) + return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn) } -func WithGrpcFilerClient(filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcFilerClients(filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { +func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { for _, filerAddress := range filerAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerAddress.ToGrpcAddress(), grpcDialOption) |
