aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/grpc_client_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb/grpc_client_server.go')
-rw-r--r--weed/pb/grpc_client_server.go73
1 files changed, 44 insertions, 29 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index 08fb7fac7..532c66932 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -97,7 +97,8 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG
return existingConnection, nil
}
- grpcConnection, err := GrpcDial(context.Background(), address, opts...)
+ ctx := context.Background()
+ grpcConnection, err := GrpcDial(ctx, address, opts...)
if err != nil {
return nil, fmt.Errorf("fail to dial %s: %v", address, err)
}
@@ -112,28 +113,42 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG
return vgc, nil
}
-func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
+// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection.
+func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
- vgc, err := getOrCreateConnection(address, opts...)
- if err != nil {
- return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
- }
- executionErr := fn(vgc.ClientConn)
- if executionErr != nil {
- if strings.Contains(executionErr.Error(), "transport") ||
- strings.Contains(executionErr.Error(), "connection closed") {
- grpcClientsLock.Lock()
- if t, ok := grpcClients[address]; ok {
- if t.version == vgc.version {
- vgc.Close()
- delete(grpcClients, address)
+ if !streamingMode {
+ vgc, err := getOrCreateConnection(address, opts...)
+ if err != nil {
+ return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
+ }
+ executionErr := fn(vgc.ClientConn)
+ if executionErr != nil {
+ if strings.Contains(executionErr.Error(), "transport") ||
+ strings.Contains(executionErr.Error(), "connection closed") {
+ grpcClientsLock.Lock()
+ if t, ok := grpcClients[address]; ok {
+ if t.version == vgc.version {
+ vgc.Close()
+ delete(grpcClients, address)
+ }
}
+ grpcClientsLock.Unlock()
}
- grpcClientsLock.Unlock()
}
+ return executionErr
+ } else {
+ grpcConnection, err := GrpcDial(context.Background(), address, opts...)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", address, err)
+ }
+ defer grpcConnection.Close()
+ executionErr := fn(grpcConnection)
+ if executionErr != nil {
+ return executionErr
+ }
+ return nil
}
- return executionErr
}
func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
@@ -184,18 +199,18 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
return util.JoinHostPort(host, port)
}
-func WithMasterClient(master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
- return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
+ return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, master.ToGrpcAddress(), grpcDialOption)
}
-func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
+func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
for _, masterGrpcAddress := range masterGrpcAddresses {
- err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterGrpcAddress.ToGrpcAddress(), grpcDialOption)
@@ -207,34 +222,34 @@ func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOpt
return err
}
-func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
- return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := messaging_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
}, brokerGrpcAddress, grpcDialOption)
}
-func WithFilerClient(filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
+func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
- return WithGrpcFilerClient(filer, grpcDialOption, fn)
+ return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn)
}
-func WithGrpcFilerClient(filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
+func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
- return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress.ToGrpcAddress(), grpcDialOption)
}
-func WithOneOfGrpcFilerClients(filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
+func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
for _, filerAddress := range filerAddresses {
- err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerAddress.ToGrpcAddress(), grpcDialOption)