diff options
| author | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
| commit | 9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch) | |
| tree | 1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/filesys | |
| parent | c935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff) | |
| download | seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip | |
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 6 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 4 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 2 | ||||
| -rw-r--r-- | weed/filesys/file.go | 4 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 2 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 2 | ||||
| -rw-r--r-- | weed/filesys/wfs_filer_client.go | 4 | ||||
| -rw-r--r-- | weed/filesys/wfs_write.go | 2 |
8 files changed, 13 insertions, 13 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index cedcf2d76..9d935e53c 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -201,7 +201,7 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex } glog.V(1).Infof("create %s/%s", dirFullPath, name) - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -242,7 +242,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err dirFullPath := dir.FullPath() - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(newEntry) defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) @@ -566,7 +566,7 @@ func (dir *Dir) saveEntry(entry *filer_pb.Entry) error { parentDir, name := util.FullPath(dir.FullPath()).DirAndName() - return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(entry) defer dir.wfs.mapPbIdFromFilerToLocal(entry) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index acdcd2de4..68f5a79e2 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -68,7 +68,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f } // apply changes to the filer, and also apply to local metaCache - err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -121,7 +121,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, Signatures: []int32{dir.wfs.signature}, } - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 1ee6922d8..d8ea3e459 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -22,7 +22,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) // update remote filer - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 767841f9d..bbc5c8e21 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -331,7 +331,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { } func (file *File) saveEntry(entry *filer_pb.Entry) error { - return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { file.wfs.mapPbIdFromLocalToFiler(entry) defer file.wfs.mapPbIdFromFilerToLocal(entry) @@ -362,7 +362,7 @@ func (file *File) getEntry() *filer_pb.Entry { } func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { - err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: file.dir.FullPath(), diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 2aa4de6da..607b901ff 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -277,7 +277,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { return nil } - err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := fh.f.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { entry := fh.f.getEntry() if entry == nil { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index aa4f9dacd..127c160c4 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -197,7 +197,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index cce6aa1a1..4feef867e 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -11,7 +11,7 @@ import ( var _ = filer_pb.FilerClient(&WFS{}) -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { return util.Retry("filer grpc", func() error { @@ -20,7 +20,7 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err for x := 0; x < n; x++ { filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress() - err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress, wfs.option.GrpcDialOption) diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 61a463e88..17489547c 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -19,7 +19,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun var fileId, host string var auth security.EncodedJwt - if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ Count: 1, |
