diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-05-03 00:24:35 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-05-03 00:24:35 -0700 |
| commit | b30c14b6314c96e0cb0c110e2aa1fc206857a066 (patch) | |
| tree | 88e6fc7e72b1f341a67c8d792dff4732a7c87f80 /weed/filesys | |
| parent | 47f14775d7d9a47e0d123aa174fb1f1f75bce547 (diff) | |
| download | seaweedfs-b30c14b6314c96e0cb0c110e2aa1fc206857a066.tar.xz seaweedfs-b30c14b6314c96e0cb0c110e2aa1fc206857a066.zip | |
webdav: can read now
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 92 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 2 | ||||
| -rw-r--r-- | weed/filesys/file.go | 4 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 89 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 4 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 2 |
8 files changed, 30 insertions, 167 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 767212103..7b6cf2000 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -4,7 +4,6 @@ import ( "context" "os" "path" - "path/filepath" "time" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -54,39 +53,12 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { return nil } - parent, name := filepath.Split(dir.Path) - - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: parent, - Name: name, - } - - glog.V(1).Infof("read dir %s request: %v", dir.Path, request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - if err == filer2.ErrNotFound { - return nil - } - glog.V(0).Infof("read dir %s attr %v: %v", dir.Path, request, err) - return err - } - - if resp.Entry != nil { - dir.attributes = resp.Entry.Attributes - } - glog.V(2).Infof("read dir %s attr: %v", dir.Path, dir.attributes) - - // dir.wfs.listDirectoryEntriesCache.Set(dir.Path, resp.Entry, dir.wfs.option.EntryCacheTtl) - - return nil - }) - + entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path) if err != nil { glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err) return err } + dir.attributes = entry.Attributes glog.V(2).Infof("dir %s: %v perm: %v", dir.Path, dir.attributes, os.FileMode(dir.attributes.FileMode)) @@ -133,7 +105,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, glog.V(1).Infof("create: %v", request) if request.Entry.IsDirectory { - if err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if _, err := client.CreateEntry(ctx, request); err != nil { glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err) return fuse.EIO @@ -156,7 +128,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: dir.Path, @@ -193,33 +165,18 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { var entry *filer_pb.Entry + fullFilePath := path.Join(dir.Path, req.Name) - item := dir.wfs.listDirectoryEntriesCache.Get(path.Join(dir.Path, req.Name)) + item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath) if item != nil && !item.Expired() { entry = item.Value().(*filer_pb.Entry) } if entry == nil { - err = dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } - - glog.V(4).Infof("lookup directory entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } - - entry = resp.Entry - - // dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, dir.wfs.option.EntryCacheTtl) - - return nil - }) + entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath) + if err != nil { + return nil, err + } } if entry != nil { @@ -244,7 +201,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - err = dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { paginationLimit := 1024 remaining := dir.wfs.option.DirListingLimit @@ -306,33 +263,14 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { - var entry *filer_pb.Entry - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } - - glog.V(4).Infof("lookup to-be-removed entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } - - entry = resp.Entry - - return nil - }) - + entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name)) if err != nil { return err } dir.wfs.deleteFileChunks(ctx, entry.Chunks) - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -356,7 +294,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -402,7 +340,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus } parentDir, name := filer2.FullPath(dir.Path).DirAndName() - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 4f631bc88..92cf04d58 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -35,7 +35,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, }, } - err := dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if _, err := client.CreateEntry(ctx, request); err != nil { glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err) return fuse.EIO diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 7a415ff82..e72a15758 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -12,7 +12,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector newDir := newDirectory.(*Dir) - return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: dir.Path, diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 0044cfd87..5a7d51a91 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -167,7 +167,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte var fileId, host string var auth security.EncodedJwt - if err := pages.f.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/filesys/file.go b/weed/filesys/file.go index eb4b03f64..3015354a6 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -109,7 +109,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - return file.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: file.dir.Path, @@ -144,7 +144,7 @@ func (file *File) maybeLoadAttributes(ctx context.Context) error { file.setEntry(entry) // glog.V(1).Infof("file attr read cached %v attributes", file.Name) } else { - err := file.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: file.Name, diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index e87e0608e..feb19f525 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,20 +3,18 @@ package filesys import ( "context" "fmt" + "mime" + "path" + "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/gabriel-vasile/mimetype" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" "google.golang.org/grpc" - "mime" - "path" - "strings" - "sync" - "time" ) type FileHandle struct { @@ -68,72 +66,7 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size) - var vids []string - for _, chunkView := range chunkViews { - vids = append(vids, volumeId(chunkView.FileId)) - } - - vid2Locations := make(map[string]*filer_pb.Locations) - - err := fh.f.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return err - } - - vid2Locations = resp.LocationsMap - - return nil - }) - - if err != nil { - glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err) - return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) - } - - var totalRead int64 - var wg sync.WaitGroup - for _, chunkView := range chunkViews { - wg.Add(1) - go func(chunkView *filer2.ChunkView) { - defer wg.Done() - - glog.V(4).Infof("read fh reading chunk: %+v", chunkView) - - locations := vid2Locations[volumeId(chunkView.FileId)] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", chunkView.FileId) - err = fmt.Errorf("failed to locate %s", chunkView.FileId) - return - } - - var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)], - !chunkView.IsFullChunk) - - if err != nil { - - glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err) - - err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) - return - } - - glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) - totalRead += n - - }(chunkView) - } - wg.Wait() + totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset) resp.Data = buff[:totalRead] @@ -205,7 +138,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } - return fh.f.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType @@ -243,7 +176,7 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f var vids []string for _, fileId := range fileIds { - vids = append(vids, volumeId(fileId)) + vids = append(vids, filer2.VolumeId(fileId)) } lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { @@ -280,11 +213,3 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f return err } - -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index f8be24e5e..b4d1b0608 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -73,7 +73,7 @@ func (wfs *WFS) Root() (fs.Node, error) { return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil } -func (wfs *WFS) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) @@ -133,7 +133,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index 16f8af594..dd7992816 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -15,7 +15,7 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu fileIds = append(fileIds, chunk.FileId) } - wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds) return nil }) |
