diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-05-22 17:46:53 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-05-22 17:46:53 -0700 |
| commit | 3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9 (patch) | |
| tree | 88103f917f69f9ee78cb50991ebe95c000025f18 | |
| parent | 2d0a0733ec36596b07a1b9e3d01079e5d4498a1d (diff) | |
| download | seaweedfs-3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9.tar.xz seaweedfs-3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9.zip | |
filer: re-create grpc connections if having transport error
fix https://github.com/chrislusf/seaweedfs/issues/2070
| -rw-r--r-- | weed/pb/grpc_client_server.go | 34 |
1 files changed, 28 insertions, 6 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index cdac0ba99..edb60e4fa 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "math/rand" "net/http" "strconv" "strings" @@ -24,10 +25,15 @@ const ( var ( // cache grpc connections - grpcClients = make(map[string]*grpc.ClientConn) + grpcClients = make(map[string]*versionedGrpcClient) grpcClientsLock sync.Mutex ) +type versionedGrpcClient struct { + *grpc.ClientConn + version int +} + func init() { http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024 http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024 @@ -79,7 +85,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr return grpc.DialContext(ctx, address, options...) } -func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) { grpcClientsLock.Lock() defer grpcClientsLock.Unlock() @@ -94,18 +100,34 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.Clien return nil, fmt.Errorf("fail to dial %s: %v", address, err) } - grpcClients[address] = grpcConnection + vgc := &versionedGrpcClient{ + grpcConnection, + rand.Int(), + } + grpcClients[address] = vgc - return grpcConnection, nil + return vgc, nil } func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - grpcConnection, err := getOrCreateConnection(address, opts...) + vgc, err := getOrCreateConnection(address, opts...) if err != nil { return fmt.Errorf("getOrCreateConnection %s: %v", address, err) } - return fn(grpcConnection) + executionErr := fn(vgc.ClientConn) + if executionErr != nil && strings.Contains(executionErr.Error(), "transport") { + grpcClientsLock.Lock() + if t, ok := grpcClients[address]; ok { + if t.version == vgc.version { + vgc.Close() + delete(grpcClients, address) + } + } + grpcClientsLock.Unlock() + } + + return executionErr } func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) { |
