aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-05-22 17:46:53 -0700
committerChris Lu <chris.lu@gmail.com>2021-05-22 17:46:53 -0700
commit3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9 (patch)
tree88103f917f69f9ee78cb50991ebe95c000025f18
parent2d0a0733ec36596b07a1b9e3d01079e5d4498a1d (diff)
downloadseaweedfs-3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9.tar.xz
seaweedfs-3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9.zip
filer: re-create grpc connections if having transport error
fix https://github.com/chrislusf/seaweedfs/issues/2070
-rw-r--r--weed/pb/grpc_client_server.go34
1 files changed, 28 insertions, 6 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index cdac0ba99..edb60e4fa 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "math/rand"
"net/http"
"strconv"
"strings"
@@ -24,10 +25,15 @@ const (
var (
// cache grpc connections
- grpcClients = make(map[string]*grpc.ClientConn)
+ grpcClients = make(map[string]*versionedGrpcClient)
grpcClientsLock sync.Mutex
)
+type versionedGrpcClient struct {
+ *grpc.ClientConn
+ version int
+}
+
func init() {
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024
@@ -79,7 +85,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
return grpc.DialContext(ctx, address, options...)
}
-func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) {
grpcClientsLock.Lock()
defer grpcClientsLock.Unlock()
@@ -94,18 +100,34 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.Clien
return nil, fmt.Errorf("fail to dial %s: %v", address, err)
}
- grpcClients[address] = grpcConnection
+ vgc := &versionedGrpcClient{
+ grpcConnection,
+ rand.Int(),
+ }
+ grpcClients[address] = vgc
- return grpcConnection, nil
+ return vgc, nil
}
func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
- grpcConnection, err := getOrCreateConnection(address, opts...)
+ vgc, err := getOrCreateConnection(address, opts...)
if err != nil {
return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
}
- return fn(grpcConnection)
+ executionErr := fn(vgc.ClientConn)
+ if executionErr != nil && strings.Contains(executionErr.Error(), "transport") {
+ grpcClientsLock.Lock()
+ if t, ok := grpcClients[address]; ok {
+ if t.version == vgc.version {
+ vgc.Close()
+ delete(grpcClients, address)
+ }
+ }
+ grpcClientsLock.Unlock()
+ }
+
+ return executionErr
}
func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {