diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-01-24 01:40:51 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-01-24 01:40:51 -0800 |
| commit | 107e8a56ea0fee9eff996177003b32a0179d7651 (patch) | |
| tree | 9b16183cb0e27f5ef6ada86666370511d86ba735 | |
| parent | 2f75264ec7928e362f54d0be2453168d21c82834 (diff) | |
| download | seaweedfs-107e8a56ea0fee9eff996177003b32a0179d7651.tar.xz seaweedfs-107e8a56ea0fee9eff996177003b32a0179d7651.zip | |
retry context canceled request
| -rw-r--r-- | weed/filesys/wfs.go | 24 | ||||
| -rw-r--r-- | weed/util/grpc_client_server.go | 9 |
2 files changed, 27 insertions, 6 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 4cfab811b..bc78a0dbe 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "os" + "strings" "sync" "time" @@ -47,8 +48,8 @@ type WFS struct { listDirectoryEntriesCache *ccache.Cache // contains all open handles, protected by handlesLock - handlesLock sync.Mutex - handles []*FileHandle + handlesLock sync.Mutex + handles []*FileHandle pathToHandleIndex map[filer2.FullPath]int bufPool sync.Pool @@ -89,11 +90,24 @@ func (wfs *WFS) Root() (fs.Node, error) { func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + err := util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + if err == nil { + return nil + } + if strings.Contains(err.Error(), "context canceled") { + time.Sleep(1337 * time.Millisecond) + glog.V(2).Infoln("retry context canceled request...") + return util.WithCachedGrpcClient(context.Background(), func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + } + return err + } func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { @@ -116,7 +130,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handles[i] = fileHandle fileHandle.handle = uint64(i) wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle) + glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle) return } } @@ -124,7 +138,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handles = append(wfs.handles, fileHandle) fileHandle.handle = uint64(len(wfs.handles) - 1) wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) - glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle) + glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go index 31497ad35..63519d97a 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/util/grpc_client_server.go @@ -64,7 +64,14 @@ func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error, existingConnection, found := grpcClients[address] if found { grpcClientsLock.Unlock() - return fn(existingConnection) + err := fn(existingConnection) + if err != nil { + grpcClientsLock.Lock() + delete(grpcClients, address) + grpcClientsLock.Unlock() + existingConnection.Close() + } + return err } grpcConnection, err := GrpcDial(ctx, address, opts...) |
