aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-01-24 01:40:51 -0800
committerChris Lu <chris.lu@gmail.com>2020-01-24 01:40:51 -0800
commit107e8a56ea0fee9eff996177003b32a0179d7651 (patch)
tree9b16183cb0e27f5ef6ada86666370511d86ba735
parent2f75264ec7928e362f54d0be2453168d21c82834 (diff)
downloadseaweedfs-107e8a56ea0fee9eff996177003b32a0179d7651.tar.xz
seaweedfs-107e8a56ea0fee9eff996177003b32a0179d7651.zip
retry context canceled request
-rw-r--r--weed/filesys/wfs.go24
-rw-r--r--weed/util/grpc_client_server.go9
2 files changed, 27 insertions, 6 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 4cfab811b..bc78a0dbe 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -5,6 +5,7 @@ import (
"fmt"
"math"
"os"
+ "strings"
"sync"
"time"
@@ -47,8 +48,8 @@ type WFS struct {
listDirectoryEntriesCache *ccache.Cache
// contains all open handles, protected by handlesLock
- handlesLock sync.Mutex
- handles []*FileHandle
+ handlesLock sync.Mutex
+ handles []*FileHandle
pathToHandleIndex map[filer2.FullPath]int
bufPool sync.Pool
@@ -89,11 +90,24 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ err := util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ if err == nil {
+ return nil
+ }
+ if strings.Contains(err.Error(), "context canceled") {
+ time.Sleep(1337 * time.Millisecond)
+ glog.V(2).Infoln("retry context canceled request...")
+ return util.WithCachedGrpcClient(context.Background(), func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ }
+ return err
+
}
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
@@ -116,7 +130,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handles[i] = fileHandle
fileHandle.handle = uint64(i)
wfs.pathToHandleIndex[fullpath] = i
- glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle)
+ glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
return
}
}
@@ -124,7 +138,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handles = append(wfs.handles, fileHandle)
fileHandle.handle = uint64(len(wfs.handles) - 1)
wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
- glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle)
+ glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return
}
diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go
index 31497ad35..63519d97a 100644
--- a/weed/util/grpc_client_server.go
+++ b/weed/util/grpc_client_server.go
@@ -64,7 +64,14 @@ func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error,
existingConnection, found := grpcClients[address]
if found {
grpcClientsLock.Unlock()
- return fn(existingConnection)
+ err := fn(existingConnection)
+ if err != nil {
+ grpcClientsLock.Lock()
+ delete(grpcClients, address)
+ grpcClientsLock.Unlock()
+ existingConnection.Close()
+ }
+ return err
}
grpcConnection, err := GrpcDial(ctx, address, opts...)