diff options
Diffstat (limited to 'weed/filesys')
23 files changed, 0 insertions, 3942 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go deleted file mode 100644 index 9a791e013..000000000 --- a/weed/filesys/dir.go +++ /dev/null @@ -1,618 +0,0 @@ -package filesys - -import ( - "bytes" - "context" - "math" - "os" - "strings" - "syscall" - "time" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type Dir struct { - name string - wfs *WFS - entry *filer_pb.Entry - parent *Dir - id uint64 -} - -var _ = fs.Node(&Dir{}) - -var _ = fs.NodeIdentifier(&Dir{}) -var _ = fs.NodeCreater(&Dir{}) -var _ = fs.NodeMknoder(&Dir{}) -var _ = fs.NodeMkdirer(&Dir{}) -var _ = fs.NodeFsyncer(&Dir{}) -var _ = fs.NodeRequestLookuper(&Dir{}) -var _ = fs.HandleReadDirAller(&Dir{}) -var _ = fs.NodeRemover(&Dir{}) -var _ = fs.NodeRenamer(&Dir{}) -var _ = fs.NodeSetattrer(&Dir{}) -var _ = fs.NodeGetxattrer(&Dir{}) -var _ = fs.NodeSetxattrer(&Dir{}) -var _ = fs.NodeRemovexattrer(&Dir{}) -var _ = fs.NodeListxattrer(&Dir{}) -var _ = fs.NodeForgetter(&Dir{}) - -func (dir *Dir) Id() uint64 { - if dir.parent == nil { - return 1 - } - return dir.id -} - -func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { - - entry, err := dir.maybeLoadEntry() - if err != nil { - glog.V(3).Infof("dir Attr %s, err: %+v", dir.FullPath(), err) - return err - } - - // https://github.com/bazil/fuse/issues/196 - attr.Valid = time.Second - attr.Inode = dir.Id() - attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir - attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) - attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) - attr.Atime = time.Unix(entry.Attributes.Mtime, 0) - attr.Gid = entry.Attributes.Gid - attr.Uid = entry.Attributes.Uid - - if dir.FullPath() == dir.wfs.option.FilerMountRootPath { - attr.BlockSize = blockSize - } - - glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) - - return nil -} - -func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - - glog.V(4).Infof("dir Getxattr %s", dir.FullPath()) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - return getxattr(entry, req, resp) -} - -func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - // fsync works at OS level - // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req) - - return nil -} - -func (dir *Dir) newFile(name string) fs.Node { - - fileFullPath := util.NewFullPath(dir.FullPath(), name) - fileId := fileFullPath.AsInode() - dir.wfs.handlesLock.Lock() - existingHandle, found := dir.wfs.handles[fileId] - dir.wfs.handlesLock.Unlock() - - if found { - glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath) - return existingHandle.f - } - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - id: fileId, - } -} - -func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node { - - return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode()} - -} - -func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, - resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { - - exclusive := req.Flags&fuse.OpenExclusive != 0 - isDirectory := req.Mode&os.ModeDir > 0 - - if exclusive || isDirectory { - _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive) - if err != nil { - return nil, nil, err - } - } - var node fs.Node - if isDirectory { - node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name)) - return node, node, nil - } - - node = dir.newFile(req.Name) - file := node.(*File) - file.entry = &filer_pb.Entry{ - Name: req.Name, - IsDirectory: req.Mode&os.ModeDir > 0, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, - Collection: dir.wfs.option.Collection, - Replication: dir.wfs.option.Replication, - TtlSec: dir.wfs.option.TtlSec, - }, - } - file.dirtyMetadata = true - fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0) - return file, fh, nil - -} - -func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) { - - _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false) - - if err != nil { - return nil, err - } - var node fs.Node - node = dir.newFile(req.Name) - return node, nil -} - -func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) { - dirFullPath := dir.FullPath() - request := &filer_pb.CreateEntryRequest{ - Directory: dirFullPath, - Entry: &filer_pb.Entry{ - Name: name, - IsDirectory: mode&os.ModeDir > 0, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(mode &^ dir.wfs.option.Umask), - Uid: uid, - Gid: gid, - Collection: dir.wfs.option.Collection, - Replication: dir.wfs.option.Replication, - TtlSec: dir.wfs.option.TtlSec, - }, - }, - OExcl: exclusive, - Signatures: []int32{dir.wfs.signature}, - } - glog.V(1).Infof("create %s/%s", dirFullPath, name) - - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.CreateEntry(client, request); err != nil { - if strings.Contains(err.Error(), "EEXIST") { - return fuse.EEXIST - } - glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err) - return fuse.EIO - } - - if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { - glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err) - return fuse.EIO - } - - return nil - }) - return request, err -} - -func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - - glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name) - - newEntry := &filer_pb.Entry{ - Name: req.Name, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, - }, - } - - dirFullPath := dir.FullPath() - - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(newEntry) - defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) - - request := &filer_pb.CreateEntryRequest{ - Directory: dirFullPath, - Entry: newEntry, - Signatures: []int32{dir.wfs.signature}, - } - - glog.V(1).Infof("mkdir: %v", request) - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) - return err - } - - if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { - glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err) - return fuse.EIO - } - - return nil - }) - - if err == nil { - node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name)) - - return node, nil - } - - glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) - - return nil, fuse.EIO -} - -func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - - dirPath := util.FullPath(dir.FullPath()) - // glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String()) - - fullFilePath := dirPath.Child(req.Name) - visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) - if visitErr != nil { - glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) - return nil, fuse.EIO - } - localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - - if localEntry == nil { - // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) - entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath) - if err != nil { - glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) - return nil, fuse.ENOENT - } - localEntry = filer.FromPbEntry(string(dirPath), entry) - } else { - glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) - } - - if localEntry != nil { - if localEntry.IsDirectory() { - node = dir.newDirectory(fullFilePath) - } else { - node = dir.newFile(req.Name) - } - - // resp.EntryValid = time.Second - resp.Attr.Inode = fullFilePath.AsInode() - resp.Attr.Valid = time.Second - resp.Attr.Size = localEntry.FileSize - resp.Attr.Mtime = localEntry.Attr.Mtime - resp.Attr.Crtime = localEntry.Attr.Crtime - resp.Attr.Mode = localEntry.Attr.Mode - resp.Attr.Gid = localEntry.Attr.Gid - resp.Attr.Uid = localEntry.Attr.Uid - if localEntry.HardLinkCounter > 0 { - resp.Attr.Nlink = uint32(localEntry.HardLinkCounter) - } - - return node, nil - } - - glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err) - return nil, fuse.ENOENT -} - -func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - - dirPath := util.FullPath(dir.FullPath()) - glog.V(4).Infof("dir ReadDirAll %s", dirPath) - - processEachEntryFn := func(entry *filer.Entry, isLast bool) { - if entry.IsDirectory() { - dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode()} - ret = append(ret, dirent) - } else { - dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode()} - ret = append(ret, dirent) - } - } - - if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { - glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) - return nil, fuse.EIO - } - listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { - processEachEntryFn(entry, false) - return true - }) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return nil, fuse.EIO - } - - // create proper . and .. directories - ret = append(ret, fuse.Dirent{ - Inode: dirPath.AsInode(), - Name: ".", - Type: fuse.DT_Dir, - }) - - // return the correct parent inode for the mount root - var inode uint64 - if string(dirPath) == dir.wfs.option.FilerMountRootPath { - inode = dir.wfs.option.MountParentInode - } else { - inode = util.FullPath(dir.parent.FullPath()).AsInode() - } - - ret = append(ret, fuse.Dirent{ - Inode: inode, - Name: "..", - Type: fuse.DT_Dir, - }) - - return -} - -func findFileType(mode uint16) fuse.DirentType { - switch mode & (syscall.S_IFMT & 0xffff) { - case syscall.S_IFSOCK: - return fuse.DT_Socket - case syscall.S_IFLNK: - return fuse.DT_Link - case syscall.S_IFREG: - return fuse.DT_File - case syscall.S_IFBLK: - return fuse.DT_Block - case syscall.S_IFDIR: - return fuse.DT_Dir - case syscall.S_IFCHR: - return fuse.DT_Char - case syscall.S_IFIFO: - return fuse.DT_FIFO - } - return fuse.DT_File -} - -func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { - - if !req.Dir { - return dir.removeOneFile(req) - } - - return dir.removeFolder(req) - -} - -func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { - - dirFullPath := dir.FullPath() - filePath := util.NewFullPath(dirFullPath, req.Name) - entry, err := filer_pb.GetEntry(dir.wfs, filePath) - if err != nil { - return err - } - - // first, ensure the filer store can correctly delete - glog.V(3).Infof("remove file: %v", req) - isDeleteData := entry != nil && entry.HardLinkCounter <= 1 - err = filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature}) - if err != nil { - glog.V(3).Infof("not found remove file %s: %v", filePath, err) - return fuse.ENOENT - } - - // then, delete meta cache and fsNode cache - if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil { - glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err) - return fuse.ESTALE - } - - // remove current file handle if any - dir.wfs.handlesLock.Lock() - defer dir.wfs.handlesLock.Unlock() - inodeId := filePath.AsInode() - if fh, ok := dir.wfs.handles[inodeId]; ok { - delete(dir.wfs.handles, inodeId) - fh.isDeleted = true - } - - return nil - -} - -func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { - - dirFullPath := dir.FullPath() - glog.V(3).Infof("remove directory entry: %v", req) - ignoreRecursiveErr := true // ignore recursion error since the OS should manage it - err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature}) - if err != nil { - glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err) - if strings.Contains(err.Error(), "non-empty") { - return fuse.EEXIST - } - return fuse.ENOENT - } - - t := util.NewFullPath(dirFullPath, req.Name) - dir.wfs.metaCache.DeleteEntry(context.Background(), t) - - return nil - -} - -func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - - glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if req.Valid.Mode() { - entry.Attributes.FileMode = uint32(req.Mode) - } - - if req.Valid.Uid() { - entry.Attributes.Uid = req.Uid - } - - if req.Valid.Gid() { - entry.Attributes.Gid = req.Gid - } - - if req.Valid.Mtime() { - entry.Attributes.Mtime = req.Mtime.Unix() - } - - return dir.saveEntry(entry) - -} - -func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - - glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if err := setxattr(entry, req); err != nil { - return err - } - - return dir.saveEntry(entry) - -} - -func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - - glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if err := removexattr(entry, req); err != nil { - return err - } - - return dir.saveEntry(entry) - -} - -func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - - glog.V(4).Infof("dir Listxattr %s", dir.FullPath()) - - entry, err := dir.maybeLoadEntry() - if err != nil { - return err - } - - if err := listxattr(entry, req, resp); err != nil { - return err - } - - return nil - -} - -func (dir *Dir) Forget() { - glog.V(4).Infof("Forget dir %s", dir.FullPath()) -} - -func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) { - parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() - return dir.wfs.maybeLoadEntry(parentDirPath, name) -} - -func (dir *Dir) saveEntry(entry *filer_pb.Entry) error { - - parentDir, name := util.FullPath(dir.FullPath()).DirAndName() - - return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(entry) - defer dir.wfs.mapPbIdFromFilerToLocal(entry) - - request := &filer_pb.UpdateEntryRequest{ - Directory: parentDir, - Entry: entry, - Signatures: []int32{dir.wfs.signature}, - } - - glog.V(1).Infof("save dir entry: %v", request) - _, err := client.UpdateEntry(context.Background(), request) - if err != nil { - glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) - return fuse.EIO - } - - if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { - glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) - return fuse.ESTALE - } - - return nil - }) -} - -func (dir *Dir) FullPath() string { - var parts []string - for p := dir; p != nil; p = p.parent { - if strings.HasPrefix(p.name, "/") { - if len(p.name) > 1 { - parts = append(parts, p.name[1:]) - } - } else { - parts = append(parts, p.name) - } - } - - if len(parts) == 0 { - return "/" - } - - var buf bytes.Buffer - for i := len(parts) - 1; i >= 0; i-- { - buf.WriteString("/") - buf.WriteString(parts[i]) - } - return buf.String() -} diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go deleted file mode 100644 index acdcd2de4..000000000 --- a/weed/filesys/dir_link.go +++ /dev/null @@ -1,160 +0,0 @@ -package filesys - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/util" - "os" - "syscall" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" -) - -var _ = fs.NodeLinker(&Dir{}) -var _ = fs.NodeSymlinker(&Dir{}) -var _ = fs.NodeReadlinker(&File{}) - -const ( - HARD_LINK_MARKER = '\x01' -) - -func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) { - - oldFile, ok := old.(*File) - if !ok { - glog.Errorf("old node is not a file: %+v", old) - } - - glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName) - - oldEntry, err := oldFile.maybeLoadEntry(ctx) - if err != nil { - return nil, err - } - - if oldEntry == nil { - return nil, fuse.EIO - } - - // update old file to hardlink mode - if len(oldEntry.HardLinkId) == 0 { - oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER) - oldEntry.HardLinkCounter = 1 - } - oldEntry.HardLinkCounter++ - updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ - Directory: oldFile.dir.FullPath(), - Entry: oldEntry, - Signatures: []int32{dir.wfs.signature}, - } - - // CreateLink 1.2 : update new file to hardlink mode - request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), - Entry: &filer_pb.Entry{ - Name: req.NewName, - IsDirectory: false, - Attributes: oldEntry.Attributes, - Chunks: oldEntry.Chunks, - Extended: oldEntry.Extended, - HardLinkId: oldEntry.HardLinkId, - HardLinkCounter: oldEntry.HardLinkCounter, - }, - Signatures: []int32{dir.wfs.signature}, - } - - // apply changes to the filer, and also apply to local metaCache - err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil { - glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) - return fuse.EIO - } - dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry)) - - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) - return fuse.EIO - } - dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - return nil - }) - - if err != nil { - return nil, fuse.EIO - } - - // create new file node - newNode := dir.newFile(req.NewName) - newFile := newNode.(*File) - - return newFile, err - -} - -func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) { - - glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) - - request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), - Entry: &filer_pb.Entry{ - Name: req.NewName, - IsDirectory: false, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32((os.FileMode(0777) | os.ModeSymlink) &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, - SymlinkTarget: req.Target, - }, - }, - Signatures: []int32{dir.wfs.signature}, - } - - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - dir.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err) - return fuse.EIO - } - - dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - return nil - }) - - symlink := dir.newFile(req.NewName) - - return symlink, err - -} - -func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) { - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return "", err - } - - if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 { - return "", fuse.Errno(syscall.EINVAL) - } - - glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget) - - return entry.Attributes.SymlinkTarget, nil - -} diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go deleted file mode 100644 index dd76577b0..000000000 --- a/weed/filesys/dir_rename.go +++ /dev/null @@ -1,174 +0,0 @@ -package filesys - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "math" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { - - newDir := newDirectory.(*Dir) - - newPath := util.NewFullPath(newDir.FullPath(), req.NewName) - oldPath := util.NewFullPath(dir.FullPath(), req.OldName) - - glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) - - // find local old entry - oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath) - if err != nil { - glog.Errorf("dir Rename can not find source %s : %v", oldPath, err) - return fuse.ENOENT - } - - // update remote filer - err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - request := &filer_pb.AtomicRenameEntryRequest{ - OldDirectory: dir.FullPath(), - OldName: req.OldName, - NewDirectory: newDir.FullPath(), - NewName: req.NewName, - Signatures: []int32{dir.wfs.signature}, - } - - _, err := client.AtomicRenameEntry(ctx, request) - if err != nil { - glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err) - return fuse.EXDEV - } - - return nil - - }) - if err != nil { - glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName) - if err != nil { - glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - return nil -} - -func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { - - oldName := entry.Name() - - oldPath := oldParent.Child(oldName) - newPath := newParent.Child(newName) - if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { - - oldFsNode := NodeWithId(oldPath.AsInode()) - newFsNode := NodeWithId(newPath.AsInode()) - newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode())) - var newDir *Dir - if found { - newDir = newDirNode.(*Dir) - } - dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) { - if file, ok := internalNode.(*File); ok { - glog.V(4).Infof("internal file node %s", oldParent.Child(oldName)) - file.Name = newName - file.id = uint64(newFsNode) - if found { - file.dir = newDir - } - } - if dir, ok := internalNode.(*Dir); ok { - glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName)) - dir.name = newName - dir.id = uint64(newFsNode) - if found { - dir.parent = newDir - } - } - }) - - // change file handle - inodeId := oldPath.AsInode() - dir.wfs.handlesLock.Lock() - if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle == nil { - glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath) - delete(dir.wfs.handles, inodeId) - dir.wfs.handles[newPath.AsInode()] = existingHandle - } - dir.wfs.handlesLock.Unlock() - - if entry.IsDirectory() { - if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil { - return err - } - } - return nil - }); err != nil { - return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err) - } - - return nil -} - -func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error { - - currentDirPath := oldParent.Child(oldName) - newDirPath := newParent.Child(newName) - - glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath) - - var moveErr error - listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool { - moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) - if moveErr != nil { - return false - } - return true - }) - if listErr != nil { - return listErr - } - if moveErr != nil { - return moveErr - } - - return nil -} - -func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { - - newPath := newParent.Child(newName) - oldPath := oldParent.Child(entry.Name()) - - entry.FullPath = newPath - if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil { - glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - if moveFolderSubEntries != nil { - if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { - return moveChildrenErr - } - } - - if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil { - glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - return nil -} diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go deleted file mode 100644 index 304793340..000000000 --- a/weed/filesys/dirty_page_interval.go +++ /dev/null @@ -1,222 +0,0 @@ -package filesys - -import ( - "io" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type IntervalNode struct { - Data []byte - Offset int64 - Size int64 - Next *IntervalNode -} - -type IntervalLinkedList struct { - Head *IntervalNode - Tail *IntervalNode -} - -type ContinuousIntervals struct { - lists []*IntervalLinkedList -} - -func (list *IntervalLinkedList) Offset() int64 { - return list.Head.Offset -} -func (list *IntervalLinkedList) Size() int64 { - return list.Tail.Offset + list.Tail.Size - list.Head.Offset -} -func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) { - // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - list.Tail.Next = node - list.Tail = node -} -func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) { - // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) - node.Next = list.Head - list.Head = node -} - -func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) { - t := list.Head - for { - - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart < nodeStop { - // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop) - copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset]) - } - - if t.Next == nil { - break - } - t = t.Next - } -} - -func (c *ContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.lists { - total += list.Size() - } - return -} - -func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { - var nodes []*IntervalNode - for t := list.Head; t != nil; t = t.Next { - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart >= nodeStop { - // skip non overlapping IntervalNode - continue - } - nodes = append(nodes, &IntervalNode{ - Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], - Offset: nodeStart, - Size: nodeStop - nodeStart, - Next: nil, - }) - } - for i := 1; i < len(nodes); i++ { - nodes[i-1].Next = nodes[i] - } - return &IntervalLinkedList{ - Head: nodes[0], - Tail: nodes[len(nodes)-1], - } -} - -func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { - - interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} - - // append to the tail and return - if len(c.lists) == 1 { - lastSpan := c.lists[0] - if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset { - lastSpan.addNodeToTail(interval) - return - } - } - - var newLists []*IntervalLinkedList - for _, list := range c.lists { - // if list is to the left of new interval, add to the new list - if list.Tail.Offset+list.Tail.Size <= interval.Offset { - newLists = append(newLists, list) - } - // if list is to the right of new interval, add to the new list - if interval.Offset+interval.Size <= list.Head.Offset { - newLists = append(newLists, list) - } - // if new interval overwrite the right part of the list - if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size { - // create a new list of the left part of existing list - newLists = append(newLists, subList(list, list.Offset(), interval.Offset)) - } - // if new interval overwrite the left part of the list - if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size { - // create a new list of the right part of existing list - newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size)) - } - // skip anything that is fully overwritten by the new interval - } - - c.lists = newLists - // add the new interval to the lists, connecting neighbor lists - var prevList, nextList *IntervalLinkedList - - for _, list := range c.lists { - if list.Head.Offset == interval.Offset+interval.Size { - nextList = list - break - } - } - - for _, list := range c.lists { - if list.Head.Offset+list.Size() == offset { - list.addNodeToTail(interval) - prevList = list - break - } - } - - if prevList != nil && nextList != nil { - // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size) - prevList.Tail.Next = nextList.Head - prevList.Tail = nextList.Tail - c.removeList(nextList) - } else if nextList != nil { - // add to head was not done when checking - nextList.addNodeToHead(interval) - } - if prevList == nil && nextList == nil { - c.lists = append(c.lists, &IntervalLinkedList{ - Head: interval, - Tail: interval, - }) - } - - return -} - -func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList { - var maxSize int64 - maxIndex := -1 - for k, list := range c.lists { - if maxSize <= list.Size() { - maxSize = list.Size() - maxIndex = k - } - } - if maxSize <= 0 { - return nil - } - - t := c.lists[maxIndex] - c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) - return t - -} - -func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) { - index := -1 - for k, list := range c.lists { - if list.Offset() == target.Offset() { - index = k - } - } - if index < 0 { - return - } - - c.lists = append(c.lists[0:index], c.lists[index+1:]...) - -} - -func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.lists { - start := max(startOffset, list.Offset()) - stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start < stop { - list.ReadData(data[start-startOffset:], start, stop) - maxStop = max(maxStop, stop) - } - } - return -} - -func (l *IntervalLinkedList) ToReader() io.Reader { - var readers []io.Reader - t := l.Head - readers = append(readers, util.NewBytesReader(t.Data)) - for t.Next != nil { - t = t.Next - readers = append(readers, util.NewBytesReader(t.Data)) - } - if len(readers) == 1 { - return readers[0] - } - return io.MultiReader(readers...) -} diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go deleted file mode 100644 index d02ad27fd..000000000 --- a/weed/filesys/dirty_page_interval_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package filesys - -import ( - "bytes" - "math/rand" - "testing" -) - -func TestContinuousIntervals_AddIntervalAppend(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25 - c.AddInterval(getBytes(25, 3), 0) - // _, _, 23, 23, 23, 23 - c.AddInterval(getBytes(23, 4), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 23, 23) - -} - -func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25, 25, 25 - c.AddInterval(getBytes(25, 5), 0) - // _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 25) - -} - -func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 1, - c.AddInterval(getBytes(1, 1), 0) - // _, 2, - c.AddInterval(getBytes(2, 1), 1) - // _, _, 3, 3, 3 - c.AddInterval(getBytes(3, 3), 2) - // _, _, _, 4, 4, 4 - c.AddInterval(getBytes(4, 3), 3) - - expectedData(t, c, 0, 1, 2, 3, 4, 4, 4) - -} - -func TestContinuousIntervals_RealCase1(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, - c.AddInterval(getBytes(25, 1), 0) - // _, _, _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 4) - // _, _, _, 24, 24, 24, 24 - c.AddInterval(getBytes(24, 4), 3) - - // _, 22, 22 - c.AddInterval(getBytes(22, 2), 1) - - expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24) - -} - -func TestRandomWrites(t *testing.T) { - - c := &ContinuousIntervals{} - - data := make([]byte, 1024) - - for i := 0; i < 1024; i++ { - - start, stop := rand.Intn(len(data)), rand.Intn(len(data)) - if start > stop { - start, stop = stop, start - } - - rand.Read(data[start : stop+1]) - - c.AddInterval(data[start:stop+1], int64(start)) - - expectedData(t, c, 0, data...) - - } - -} - -func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) { - start, stop := int64(offset), int64(offset+len(data)) - for _, list := range c.lists { - nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size()) - if nodeStart < nodeStop { - buf := make([]byte, nodeStop-nodeStart) - list.ReadData(buf, nodeStart, nodeStop) - if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 { - t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf) - } - } - } -} - -func getBytes(content byte, length int) []byte { - data := make([]byte, length) - for i := 0; i < length; i++ { - data[i] = content - } - return data -} diff --git a/weed/filesys/dirty_pages.go b/weed/filesys/dirty_pages.go deleted file mode 100644 index 8505323ef..000000000 --- a/weed/filesys/dirty_pages.go +++ /dev/null @@ -1,10 +0,0 @@ -package filesys - -type DirtyPages interface { - AddPage(offset int64, data []byte) - FlushData() error - ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) - GetStorageOptions() (collection, replication string) - SetWriteOnly(writeOnly bool) - GetWriteOnly() (writeOnly bool) -} diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go deleted file mode 100644 index b7514a2eb..000000000 --- a/weed/filesys/dirty_pages_continuous.go +++ /dev/null @@ -1,160 +0,0 @@ -package filesys - -import ( - "bytes" - "fmt" - "io" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -type ContinuousDirtyPages struct { - intervals *ContinuousIntervals - f *File - writeOnly bool - writeWaitGroup sync.WaitGroup - chunkAddLock sync.Mutex - lastErr error - collection string - replication string -} - -func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages { - dirtyPages := &ContinuousDirtyPages{ - intervals: &ContinuousIntervals{}, - f: file, - writeOnly: writeOnly, - } - return dirtyPages -} - -func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { - - glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) - - if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { - // this is more than what buffer can hold. - pages.flushAndSave(offset, data) - } - - pages.intervals.AddInterval(data, offset) - - if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit { - pages.saveExistingLargestPageToStorage() - } - - return -} - -func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) { - - // flush existing - pages.saveExistingPagesToStorage() - - // flush the new page - pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))) - - return -} - -func (pages *ContinuousDirtyPages) FlushData() error { - - pages.saveExistingPagesToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - return nil -} - -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() { - for pages.saveExistingLargestPageToStorage() { - } -} - -func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) { - - maxList := pages.intervals.RemoveLargestIntervalLinkedList() - if maxList == nil { - return false - } - - entry := pages.f.getEntry() - if entry == nil { - return false - } - - fileSize := int64(entry.Attributes.FileSize) - - chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) - if chunkSize == 0 { - return false - } - - pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) - - return true -} - -func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } -} - -func max(x, y int64) int64 { - if x > y { - return x - } - return y -} -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} - -func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.intervals.ReadDataAt(data, startOffset) -} - -func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *ContinuousDirtyPages) SetWriteOnly(writeOnly bool) { - if pages.writeOnly { - pages.writeOnly = writeOnly - } -} - -func (pages *ContinuousDirtyPages) GetWriteOnly() (writeOnly bool) { - return pages.writeOnly -} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go deleted file mode 100644 index 9fa7c0c8e..000000000 --- a/weed/filesys/dirty_pages_temp_file.go +++ /dev/null @@ -1,157 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "os" - "sync" - "time" -) - -type TempFileDirtyPages struct { - f *File - tf *os.File - writtenIntervals *WrittenContinuousIntervals - writeOnly bool - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string -} - -func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages { - - tempFile := &TempFileDirtyPages{ - f: file, - writeOnly: writeOnly, - writtenIntervals: &WrittenContinuousIntervals{}, - } - - return tempFile -} - -func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - if pages.tf == nil { - tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "") - if err != nil { - glog.Errorf("create temp file: %v", err) - pages.lastErr = err - return - } - pages.tf = tf - pages.writtenIntervals.tempFile = tf - pages.writtenIntervals.lastOffset = 0 - } - - writtenOffset := pages.writtenIntervals.lastOffset - dataSize := int64(len(data)) - - // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) - - if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil { - pages.lastErr = err - } else { - pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) - pages.writtenIntervals.lastOffset += dataSize - } - - // pages.writtenIntervals.debug() - - return -} - -func (pages *TempFileDirtyPages) FlushData() error { - - pages.saveExistingPagesToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - if pages.tf != nil { - - pages.writtenIntervals.tempFile = nil - pages.writtenIntervals.lists = nil - - pages.tf.Close() - os.Remove(pages.tf.Name()) - pages.tf = nil - } - return nil -} - -func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { - - pageSize := pages.f.wfs.option.ChunkSizeLimit - - // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) - - for _, list := range pages.writtenIntervals.lists { - listStopOffset := list.Offset() + list.Size() - for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { - start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) - if start >= stop { - continue - } - // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists)) - pages.saveToStorage(list.ToReader(start, stop), start, stop-start) - } - } - -} - -func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } -} - -func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.writtenIntervals.ReadDataAt(data, startOffset) -} - -func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) { - if pages.writeOnly { - pages.writeOnly = writeOnly - } -} - -func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) { - return pages.writeOnly -} diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go deleted file mode 100644 index 42c4b5a3b..000000000 --- a/weed/filesys/dirty_pages_temp_interval.go +++ /dev/null @@ -1,289 +0,0 @@ -package filesys - -import ( - "io" - "log" - "os" -) - -type WrittenIntervalNode struct { - DataOffset int64 - TempOffset int64 - Size int64 - Next *WrittenIntervalNode -} - -type WrittenIntervalLinkedList struct { - tempFile *os.File - Head *WrittenIntervalNode - Tail *WrittenIntervalNode -} - -type WrittenContinuousIntervals struct { - tempFile *os.File - lastOffset int64 - lists []*WrittenIntervalLinkedList -} - -func (list *WrittenIntervalLinkedList) Offset() int64 { - return list.Head.DataOffset -} -func (list *WrittenIntervalLinkedList) Size() int64 { - return list.Tail.DataOffset + list.Tail.Size - list.Head.DataOffset -} -func (list *WrittenIntervalLinkedList) addNodeToTail(node *WrittenIntervalNode) { - // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - if list.Tail.TempOffset+list.Tail.Size == node.TempOffset { - // already connected - list.Tail.Size += node.Size - } else { - list.Tail.Next = node - list.Tail = node - } -} -func (list *WrittenIntervalLinkedList) addNodeToHead(node *WrittenIntervalNode) { - // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) - node.Next = list.Head - list.Head = node -} - -func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) { - t := list.Head - for { - - nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size) - if nodeStart < nodeStop { - // glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop) - list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset) - } - - if t.Next == nil { - break - } - t = t.Next - } -} - -func (c *WrittenContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.lists { - total += list.Size() - } - return -} - -func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenIntervalLinkedList { - var nodes []*WrittenIntervalNode - for t := list.Head; t != nil; t = t.Next { - nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size) - if nodeStart >= nodeStop { - // skip non overlapping WrittenIntervalNode - continue - } - nodes = append(nodes, &WrittenIntervalNode{ - TempOffset: t.TempOffset + nodeStart - t.DataOffset, - DataOffset: nodeStart, - Size: nodeStop - nodeStart, - Next: nil, - }) - } - for i := 1; i < len(nodes); i++ { - nodes[i-1].Next = nodes[i] - } - return &WrittenIntervalLinkedList{ - tempFile: list.tempFile, - Head: nodes[0], - Tail: nodes[len(nodes)-1], - } -} - -func (c *WrittenContinuousIntervals) debug() { - log.Printf("++") - for _, l := range c.lists { - log.Printf("++++") - for t := l.Head; ; t = t.Next { - log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) - if t.Next == nil { - break - } - } - log.Printf("----") - } - log.Printf("--") -} - -func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) { - - interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} - - // append to the tail and return - if len(c.lists) == 1 { - lastSpan := c.lists[0] - if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset { - lastSpan.addNodeToTail(interval) - return - } - } - - var newLists []*WrittenIntervalLinkedList - for _, list := range c.lists { - // if list is to the left of new interval, add to the new list - if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset { - newLists = append(newLists, list) - } - // if list is to the right of new interval, add to the new list - if interval.DataOffset+interval.Size <= list.Head.DataOffset { - newLists = append(newLists, list) - } - // if new interval overwrite the right part of the list - if list.Head.DataOffset < interval.DataOffset && interval.DataOffset < list.Tail.DataOffset+list.Tail.Size { - // create a new list of the left part of existing list - newLists = append(newLists, list.subList(list.Offset(), interval.DataOffset)) - } - // if new interval overwrite the left part of the list - if list.Head.DataOffset < interval.DataOffset+interval.Size && interval.DataOffset+interval.Size < list.Tail.DataOffset+list.Tail.Size { - // create a new list of the right part of existing list - newLists = append(newLists, list.subList(interval.DataOffset+interval.Size, list.Tail.DataOffset+list.Tail.Size)) - } - // skip anything that is fully overwritten by the new interval - } - - c.lists = newLists - // add the new interval to the lists, connecting neighbor lists - var prevList, nextList *WrittenIntervalLinkedList - - for _, list := range c.lists { - if list.Head.DataOffset == interval.DataOffset+interval.Size { - nextList = list - break - } - } - - for _, list := range c.lists { - if list.Head.DataOffset+list.Size() == dataOffset { - list.addNodeToTail(interval) - prevList = list - break - } - } - - if prevList != nil && nextList != nil { - // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size) - prevList.Tail.Next = nextList.Head - prevList.Tail = nextList.Tail - c.removeList(nextList) - } else if nextList != nil { - // add to head was not done when checking - nextList.addNodeToHead(interval) - } - if prevList == nil && nextList == nil { - c.lists = append(c.lists, &WrittenIntervalLinkedList{ - tempFile: c.tempFile, - Head: interval, - Tail: interval, - }) - } - - return -} - -func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList { - var maxSize int64 - maxIndex := -1 - for k, list := range c.lists { - if maxSize <= list.Size() { - maxSize = list.Size() - maxIndex = k - } - } - if maxSize <= 0 { - return nil - } - - t := c.lists[maxIndex] - t.tempFile = c.tempFile - c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) - return t - -} - -func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) { - index := -1 - for k, list := range c.lists { - if list.Offset() == target.Offset() { - index = k - } - } - if index < 0 { - return - } - - c.lists = append(c.lists[0:index], c.lists[index+1:]...) - -} - -func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.lists { - start := max(startOffset, list.Offset()) - stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start < stop { - list.ReadData(data[start-startOffset:], start, stop) - maxStop = max(maxStop, stop) - } - } - return -} - -func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader { - // TODO: optimize this to avoid another loop - var readers []io.Reader - for t := l.Head; ; t = t.Next { - startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop) - if startOffset < stopOffset { - // glog.V(4).Infof("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) - readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset)) - } - if t.Next == nil { - break - } - } - if len(readers) == 1 { - return readers[0] - } - return io.MultiReader(readers...) -} - -type FileSectionReader struct { - file *os.File - tempStartOffset int64 - Offset int64 - dataStart int64 - dataStop int64 -} - -var _ = io.Reader(&FileSectionReader{}) - -func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader { - return &FileSectionReader{ - file: tempfile, - tempStartOffset: offset, - Offset: offset, - dataStart: dataOffset, - dataStop: dataOffset + size, - } -} - -func (f *FileSectionReader) Read(p []byte) (n int, err error) { - remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset) - if remaining <= 0 { - return 0, io.EOF - } - dataLen := min(remaining, int64(len(p))) - // glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart) - n, err = f.file.ReadAt(p[:dataLen], f.Offset) - if n > 0 { - f.Offset += int64(n) - } else { - err = io.EOF - } - return -} diff --git a/weed/filesys/file.go b/weed/filesys/file.go deleted file mode 100644 index b990b20d1..000000000 --- a/weed/filesys/file.go +++ /dev/null @@ -1,389 +0,0 @@ -package filesys - -import ( - "context" - "os" - "sort" - "time" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -const blockSize = 512 - -var _ = fs.Node(&File{}) -var _ = fs.NodeIdentifier(&File{}) -var _ = fs.NodeOpener(&File{}) -var _ = fs.NodeFsyncer(&File{}) -var _ = fs.NodeSetattrer(&File{}) -var _ = fs.NodeGetxattrer(&File{}) -var _ = fs.NodeSetxattrer(&File{}) -var _ = fs.NodeRemovexattrer(&File{}) -var _ = fs.NodeListxattrer(&File{}) -var _ = fs.NodeForgetter(&File{}) - -type File struct { - Name string - dir *Dir - wfs *WFS - entry *filer_pb.Entry - isOpen int - dirtyMetadata bool - id uint64 -} - -func (file *File) fullpath() util.FullPath { - return util.NewFullPath(file.dir.FullPath(), file.Name) -} - -func (file *File) Id() uint64 { - return file.id -} - -func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { - - glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if entry == nil { - return fuse.ENOENT - } - - attr.Inode = file.Id() - attr.Valid = time.Second - attr.Mode = os.FileMode(entry.Attributes.FileMode) - attr.Size = filer.FileSize(entry) - if file.isOpen > 0 { - attr.Size = entry.Attributes.FileSize - glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) - } - attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) - attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - attr.Gid = entry.Attributes.Gid - attr.Uid = entry.Attributes.Uid - attr.Blocks = attr.Size/blockSize + 1 - attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit) - if entry.HardLinkCounter > 0 { - attr.Nlink = uint32(entry.HardLinkCounter) - } - - return nil - -} - -func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - - // glog.V(4).Infof("file Getxattr %s", file.fullpath()) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - return getxattr(entry, req, resp) -} - -func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { - - glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - - handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0) - - resp.Handle = fuse.HandleID(handle.handle) - - glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle) - - return handle, nil - -} - -func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - - glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if req.Valid.Size() { - - glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks)) - if req.Size < filer.FileSize(entry) { - // fmt.Printf("truncate %v \n", fullPath) - var chunks []*filer_pb.FileChunk - var truncatedChunks []*filer_pb.FileChunk - for _, chunk := range entry.Chunks { - int64Size := int64(chunk.Size) - if chunk.Offset+int64Size > int64(req.Size) { - // this chunk is truncated - int64Size = int64(req.Size) - chunk.Offset - if int64Size > 0 { - chunks = append(chunks, chunk) - glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size) - chunk.Size = uint64(int64Size) - } else { - glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString()) - truncatedChunks = append(truncatedChunks, chunk) - } - } - } - entry.Chunks = chunks - } - entry.Attributes.FileSize = req.Size - file.dirtyMetadata = true - } - - if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) { - entry.Attributes.FileMode = uint32(req.Mode) - file.dirtyMetadata = true - } - - if req.Valid.Uid() && entry.Attributes.Uid != req.Uid { - entry.Attributes.Uid = req.Uid - file.dirtyMetadata = true - } - - if req.Valid.Gid() && entry.Attributes.Gid != req.Gid { - entry.Attributes.Gid = req.Gid - file.dirtyMetadata = true - } - - if req.Valid.Crtime() { - entry.Attributes.Crtime = req.Crtime.Unix() - file.dirtyMetadata = true - } - - if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() { - entry.Attributes.Mtime = req.Mtime.Unix() - file.dirtyMetadata = true - } - - if req.Valid.Handle() { - // fmt.Printf("file handle => %d\n", req.Handle) - } - - if file.isOpen > 0 { - return nil - } - - if !file.dirtyMetadata { - return nil - } - - return file.saveEntry(entry) - -} - -func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - - glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if err := setxattr(entry, req); err != nil { - return err - } - file.dirtyMetadata = true - - if file.isOpen > 0 { - return nil - } - - return file.saveEntry(entry) - -} - -func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - - glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if err := removexattr(entry, req); err != nil { - return err - } - file.dirtyMetadata = true - - if file.isOpen > 0 { - return nil - } - - return file.saveEntry(entry) - -} - -func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - - glog.V(4).Infof("file Listxattr %s", file.fullpath()) - - entry, err := file.maybeLoadEntry(ctx) - if err != nil { - return err - } - - if err := listxattr(entry, req, resp); err != nil { - return err - } - - return nil - -} - -func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - // fsync works at OS level - // write the file chunks to the filerGrpcAddress - glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) - - return nil -} - -func (file *File) Forget() { - t := util.NewFullPath(file.dir.FullPath(), file.Name) - glog.V(4).Infof("Forget file %s", t) - file.wfs.ReleaseHandle(t, fuse.HandleID(t.AsInode())) -} - -func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) { - - file.wfs.handlesLock.Lock() - handle, found := file.wfs.handles[file.Id()] - file.wfs.handlesLock.Unlock() - entry = file.entry - if found { - // glog.V(4).Infof("maybeLoadEntry found opened file %s/%s", file.dir.FullPath(), file.Name) - entry = handle.f.entry - } - - if entry != nil { - if len(entry.HardLinkId) == 0 { - // only always reload hard link - return entry, nil - } - } - entry, err = file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) - if err != nil { - glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return entry, err - } - if entry != nil { - // file.entry = entry - } else { - glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err) - } - return entry, nil -} - -func lessThan(a, b *filer_pb.FileChunk) bool { - if a.Mtime == b.Mtime { - return a.Fid.FileKey < b.Fid.FileKey - } - return a.Mtime < b.Mtime -} - -func (file *File) addChunks(chunks []*filer_pb.FileChunk) { - - // find the earliest incoming chunk - newChunks := chunks - earliestChunk := newChunks[0] - for i := 1; i < len(newChunks); i++ { - if lessThan(earliestChunk, newChunks[i]) { - earliestChunk = newChunks[i] - } - } - - entry := file.getEntry() - if entry == nil { - return - } - - // pick out-of-order chunks from existing chunks - for _, chunk := range entry.Chunks { - if lessThan(earliestChunk, chunk) { - chunks = append(chunks, chunk) - } - } - - // sort incoming chunks - sort.Slice(chunks, func(i, j int) bool { - return lessThan(chunks[i], chunks[j]) - }) - - glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks)) - - entry.Chunks = append(entry.Chunks, newChunks...) -} - -func (file *File) saveEntry(entry *filer_pb.Entry) error { - return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - file.wfs.mapPbIdFromLocalToFiler(entry) - defer file.wfs.mapPbIdFromFilerToLocal(entry) - - request := &filer_pb.CreateEntryRequest{ - Directory: file.dir.FullPath(), - Entry: entry, - Signatures: []int32{file.wfs.signature}, - } - - glog.V(4).Infof("save file entry: %v", request) - _, err := client.CreateEntry(context.Background(), request) - if err != nil { - glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return fuse.EIO - } - - file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - file.dirtyMetadata = false - - return nil - }) -} - -func (file *File) getEntry() *filer_pb.Entry { - return file.entry -} - -func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { - err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.DownloadToLocalRequest{ - Directory: file.dir.FullPath(), - Name: entry.Name, - } - - glog.V(4).Infof("download entry: %v", request) - resp, err := client.DownloadToLocal(context.Background(), request) - if err != nil { - glog.Errorf("DownloadToLocal file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return fuse.EIO - } - - entry = resp.Entry - - file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry)) - - file.dirtyMetadata = false - - return nil - }) - - return entry, err -} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go deleted file mode 100644 index 34affddb9..000000000 --- a/weed/filesys/filehandle.go +++ /dev/null @@ -1,336 +0,0 @@ -package filesys - -import ( - "context" - "fmt" - "io" - "math" - "net/http" - "os" - "sync" - "time" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -type FileHandle struct { - // cache file has been written to - dirtyPages DirtyPages - entryViewCache []filer.VisibleInterval - reader io.ReaderAt - contentType string - handle uint64 - sync.Mutex - - f *File - RequestId fuse.RequestID // unique ID for request - NodeId fuse.NodeID // file or directory the request is about - Uid uint32 // user ID of process making request - Gid uint32 // group ID of process making request - writeOnly bool - isDeleted bool -} - -func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle { - fh := &FileHandle{ - f: file, - // dirtyPages: newContinuousDirtyPages(file, writeOnly), - dirtyPages: newTempFileDirtyPages(file, writeOnly), - Uid: uid, - Gid: gid, - } - entry := fh.f.getEntry() - if entry != nil { - entry.Attributes.FileSize = filer.FileSize(entry) - } - - return fh -} - -var _ = fs.Handle(&FileHandle{}) - -// var _ = fs.HandleReadAller(&FileHandle{}) -var _ = fs.HandleReader(&FileHandle{}) -var _ = fs.HandleFlusher(&FileHandle{}) -var _ = fs.HandleWriter(&FileHandle{}) -var _ = fs.HandleReleaser(&FileHandle{}) - -func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - - glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) - fh.Lock() - defer fh.Unlock() - - if req.Size <= 0 { - return nil - } - - buff := resp.Data[:cap(resp.Data)] - if req.Size > cap(resp.Data) { - // should not happen - buff = make([]byte, req.Size) - } - - totalRead, err := fh.readFromChunks(buff, req.Offset) - if err == nil || err == io.EOF { - maxStop := fh.readFromDirtyPages(buff, req.Offset) - totalRead = max(maxStop-req.Offset, totalRead) - } - - if err == io.EOF { - err = nil - } - - if err != nil { - glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err) - return fuse.EIO - } - - if totalRead > int64(len(buff)) { - glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead) - totalRead = min(int64(len(buff)), totalRead) - } - if err == nil { - resp.Data = buff[:totalRead] - } - - return err -} - -func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { - maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) - return -} - -func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - - entry := fh.f.getEntry() - if entry == nil { - return 0, io.EOF - } - - if entry.IsInRemoteOnly() { - glog.V(4).Infof("download remote entry %s", fh.f.fullpath()) - newEntry, err := fh.f.downloadRemoteEntry(entry) - if err != nil { - glog.V(1).Infof("download remote entry %s: %v", fh.f.fullpath(), err) - return 0, err - } - entry = newEntry - } - - fileSize := int64(filer.FileSize(entry)) - fileFullPath := fh.f.fullpath() - - if fileSize == 0 { - glog.V(1).Infof("empty fh %v", fileFullPath) - return 0, io.EOF - } - - if offset+int64(len(buff)) <= int64(len(entry.Content)) { - totalRead := copy(buff, entry.Content[offset:]) - glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) - return int64(totalRead), nil - } - - var chunkResolveErr error - if fh.entryViewCache == nil { - fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64) - if chunkResolveErr != nil { - return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) - } - fh.reader = nil - } - - reader := fh.reader - if reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64) - reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) - } - fh.reader = reader - - totalRead, err := reader.ReadAt(buff, offset) - - if err != nil && err != io.EOF { - glog.Errorf("file handle read %s: %v", fileFullPath, err) - } - - // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) - - return int64(totalRead), err -} - -// Write to the file handle -func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { - - fh.Lock() - defer fh.Unlock() - - // write the request to volume servers - data := req.Data - if len(data) <= 512 { - // fuse message cacheable size - data = make([]byte, len(req.Data)) - copy(data, req.Data) - } - - entry := fh.f.getEntry() - if entry == nil { - return fuse.EIO - } - - entry.Content = nil - entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize))) - // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - - fh.dirtyPages.AddPage(req.Offset, data) - - resp.Size = len(data) - - if req.Offset == 0 { - // detect mime type - fh.contentType = http.DetectContentType(data) - fh.f.dirtyMetadata = true - } - - fh.f.dirtyMetadata = true - - return nil -} - -func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { - - glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen) - - fh.Lock() - defer fh.Unlock() - - fh.f.wfs.handlesLock.Lock() - fh.f.isOpen-- - fh.f.wfs.handlesLock.Unlock() - - if fh.f.isOpen <= 0 { - fh.f.entry = nil - fh.entryViewCache = nil - fh.reader = nil - - fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) - } - - if fh.f.isOpen < 0 { - glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) - fh.f.isOpen = 0 - return nil - } - - return nil -} - -func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { - - glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle) - - if fh.isDeleted { - glog.V(4).Infof("Flush %v fh %d skip deleted", fh.f.fullpath(), fh.handle) - return nil - } - - fh.Lock() - defer fh.Unlock() - - if err := fh.doFlush(ctx, req.Header); err != nil { - glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err) - return err - } - - glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle) - return nil -} - -func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { - // flush works at fh level - // send the data to the OS - glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle) - - if err := fh.dirtyPages.FlushData(); err != nil { - glog.Errorf("%v doFlush: %v", fh.f.fullpath(), err) - return fuse.EIO - } - - if !fh.f.dirtyMetadata { - return nil - } - - err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - entry := fh.f.getEntry() - if entry == nil { - return nil - } - - if entry.Attributes != nil { - entry.Attributes.Mime = fh.contentType - if entry.Attributes.Uid == 0 { - entry.Attributes.Uid = header.Uid - } - if entry.Attributes.Gid == 0 { - entry.Attributes.Gid = header.Gid - } - if entry.Attributes.Crtime == 0 { - entry.Attributes.Crtime = time.Now().Unix() - } - entry.Attributes.Mtime = time.Now().Unix() - entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) - entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions() - } - - request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.FullPath(), - Entry: entry, - Signatures: []int32{fh.f.wfs.signature}, - } - - glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks)) - for i, chunk := range entry.Chunks { - glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) - } - - manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) - - chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) - chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.dirtyPages.GetWriteOnly()), chunks) - if manifestErr != nil { - // not good, but should be ok - glog.V(0).Infof("MaybeManifestize: %v", manifestErr) - } - entry.Chunks = append(chunks, manifestChunks...) - - fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry) - defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry) - - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) - return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) - } - - fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) - - return nil - }) - - if err == nil { - fh.f.dirtyMetadata = false - } - - if err != nil { - glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err) - return fuse.EIO - } - - return nil -} diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go deleted file mode 100644 index 6b1012090..000000000 --- a/weed/filesys/fscache.go +++ /dev/null @@ -1,213 +0,0 @@ -package filesys - -import ( - "sync" - - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type FsCache struct { - root *FsNode - sync.RWMutex -} -type FsNode struct { - parent *FsNode - node fs.Node - name string - childrenLock sync.RWMutex - children map[string]*FsNode -} - -func newFsCache(root fs.Node) *FsCache { - return &FsCache{ - root: &FsNode{ - node: root, - }, - } -} - -func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { - - c.RLock() - defer c.RUnlock() - - return c.doGetFsNode(path) -} - -func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { - t := c.root - for _, p := range path.Split() { - t = t.findChild(p) - if t == nil { - return nil - } - } - return t.node -} - -func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { - - c.Lock() - defer c.Unlock() - - c.doSetFsNode(path, node) -} - -func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { - t := c.root - for _, p := range path.Split() { - t = t.ensureChild(p) - } - t.node = node -} - -func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { - - c.Lock() - defer c.Unlock() - - t := c.doGetFsNode(path) - if t != nil { - return t - } - t = genNodeFn() - c.doSetFsNode(path, t) - return t -} - -func (c *FsCache) DeleteFsNode(path util.FullPath) { - - c.Lock() - defer c.Unlock() - - t := c.root - for _, p := range path.Split() { - t = t.findChild(p) - if t == nil { - return - } - } - if t.parent != nil { - t.parent.disconnectChild(t) - } - t.deleteSelf() -} - -// oldPath and newPath are full path including the new name -func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { - - c.Lock() - defer c.Unlock() - - // find old node - src := c.root - for _, p := range oldPath.Split() { - src = src.findChild(p) - if src == nil { - return src - } - } - if src.parent != nil { - src.parent.disconnectChild(src) - } - - // find new node - target := c.root - for _, p := range newPath.Split() { - target = target.ensureChild(p) - } - parent := target.parent - if dir, ok := src.node.(*Dir); ok { - dir.name = target.name // target is not Dir, but a shortcut - } - if f, ok := src.node.(*File); ok { - f.Name = target.name - entry := f.getEntry() - if entry != nil { - entry.Name = f.Name - } - } - parent.disconnectChild(target) - - target.deleteSelf() - - src.name = target.name - src.connectToParent(parent) - - return src -} - -func (n *FsNode) connectToParent(parent *FsNode) { - n.parent = parent - oldNode := parent.findChild(n.name) - if oldNode != nil { - oldNode.deleteSelf() - } - if dir, ok := n.node.(*Dir); ok { - if parent.node != nil { - dir.parent = parent.node.(*Dir) - } - } - if f, ok := n.node.(*File); ok { - if parent.node != nil { - f.dir = parent.node.(*Dir) - } - } - n.childrenLock.Lock() - parent.children[n.name] = n - n.childrenLock.Unlock() -} - -func (n *FsNode) findChild(name string) *FsNode { - n.childrenLock.RLock() - defer n.childrenLock.RUnlock() - - child, found := n.children[name] - if found { - return child - } - return nil -} - -func (n *FsNode) ensureChild(name string) *FsNode { - n.childrenLock.Lock() - defer n.childrenLock.Unlock() - - if n.children == nil { - n.children = make(map[string]*FsNode) - } - child, found := n.children[name] - if found { - return child - } - t := &FsNode{ - parent: n, - node: nil, - name: name, - children: nil, - } - n.children[name] = t - return t -} - -func (n *FsNode) disconnectChild(child *FsNode) { - n.childrenLock.Lock() - delete(n.children, child.name) - n.childrenLock.Unlock() - child.parent = nil -} - -func (n *FsNode) deleteSelf() { - n.childrenLock.Lock() - for _, child := range n.children { - child.deleteSelf() - } - n.children = nil - n.childrenLock.Unlock() - - n.node = nil - n.parent = nil - -} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go deleted file mode 100644 index 1152eb32e..000000000 --- a/weed/filesys/fscache_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package filesys - -import ( - "testing" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -func TestPathSplit(t *testing.T) { - parts := util.FullPath("/").Split() - if len(parts) != 0 { - t.Errorf("expecting an empty list, but getting %d", len(parts)) - } - - parts = util.FullPath("/readme.md").Split() - if len(parts) != 1 { - t.Errorf("expecting an empty list, but getting %d", len(parts)) - } - -} - -func TestFsCache(t *testing.T) { - - cache := newFsCache(nil) - - x := cache.GetFsNode(util.FullPath("/y/x")) - if x != nil { - t.Errorf("wrong node!") - } - - p := util.FullPath("/a/b/c") - cache.SetFsNode(p, &File{Name: "cc"}) - tNode := cache.GetFsNode(p) - tFile := tNode.(*File) - if tFile.Name != "cc" { - t.Errorf("expecting a FsNode") - } - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) - cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) - cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) - - b := cache.GetFsNode(util.FullPath("/a/b")) - if b != nil { - t.Errorf("unexpected node!") - } - - a := cache.GetFsNode(util.FullPath("/a")) - if a == nil { - t.Errorf("missing node!") - } - - cache.DeleteFsNode(util.FullPath("/a")) - if b != nil { - t.Errorf("unexpected node!") - } - - a = cache.GetFsNode(util.FullPath("/a")) - if a != nil { - t.Errorf("wrong DeleteFsNode!") - } - - z := cache.GetFsNode(util.FullPath("/z")) - if z == nil { - t.Errorf("missing node!") - } - - y := cache.GetFsNode(util.FullPath("/x/y")) - if y != nil { - t.Errorf("wrong node!") - } - -} - -func TestFsCacheMove(t *testing.T) { - - cache := newFsCache(nil) - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) - cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) - - cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) - - d := cache.GetFsNode(util.FullPath("/z/x/d")) - if d == nil { - t.Errorf("unexpected nil node!") - } - if d.(*File).Name != "dd" { - t.Errorf("unexpected non dd node!") - } - -} - -func TestFsCacheMove2(t *testing.T) { - - cache := newFsCache(nil) - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - - cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e")) - - d := cache.GetFsNode(util.FullPath("/a/b/e")) - if d == nil { - t.Errorf("unexpected nil node!") - } - if d.(*File).Name != "e" { - t.Errorf("unexpected node!") - } - -} diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go deleted file mode 100644 index e6593ebde..000000000 --- a/weed/filesys/meta_cache/cache_config.go +++ /dev/null @@ -1,32 +0,0 @@ -package meta_cache - -import "github.com/chrislusf/seaweedfs/weed/util" - -var ( - _ = util.Configuration(&cacheConfig{}) -) - -// implementing util.Configuraion -type cacheConfig struct { - dir string -} - -func (c cacheConfig) GetString(key string) string { - return c.dir -} - -func (c cacheConfig) GetBool(key string) bool { - panic("implement me") -} - -func (c cacheConfig) GetInt(key string) int { - panic("implement me") -} - -func (c cacheConfig) GetStringSlice(key string) []string { - panic("implement me") -} - -func (c cacheConfig) SetDefault(key string, value interface{}) { - panic("implement me") -} diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go deleted file mode 100644 index 4a2179f31..000000000 --- a/weed/filesys/meta_cache/id_mapper.go +++ /dev/null @@ -1,101 +0,0 @@ -package meta_cache - -import ( - "fmt" - "strconv" - "strings" -) - -type UidGidMapper struct { - uidMapper *IdMapper - gidMapper *IdMapper -} - -type IdMapper struct { - localToFiler map[uint32]uint32 - filerToLocal map[uint32]uint32 -} - -// UidGidMapper translates local uid/gid to filer uid/gid -// The local storage always persists the same as the filer. -// The local->filer translation happens when updating the filer first and later saving to meta_cache. -// And filer->local happens when reading from the meta_cache. -func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) { - uidMapper, err := newIdMapper(uidPairsStr) - if err != nil { - return nil, err - } - gidMapper, err := newIdMapper(gidPairStr) - if err != nil { - return nil, err - } - - return &UidGidMapper{ - uidMapper: uidMapper, - gidMapper: gidMapper, - }, nil -} - -func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) { - return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid) -} -func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) { - return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid) -} - -func (m *IdMapper) LocalToFiler(id uint32) uint32 { - value, found := m.localToFiler[id] - if found { - return value - } - return id -} -func (m *IdMapper) FilerToLocal(id uint32) uint32 { - value, found := m.filerToLocal[id] - if found { - return value - } - return id -} - -func newIdMapper(pairsStr string) (*IdMapper, error) { - - localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr) - if err != nil { - return nil, err - } - - return &IdMapper{ - localToFiler: localToFiler, - filerToLocal: filerToLocal, - }, nil - -} - -func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) { - - if pairsStr == "" { - return - } - - localToFiler = make(map[uint32]uint32) - filerToLocal = make(map[uint32]uint32) - for _, pairStr := range strings.Split(pairsStr, ",") { - pair := strings.Split(pairStr, ":") - localUidStr, filerUidStr := pair[0], pair[1] - localUid, localUidErr := strconv.Atoi(localUidStr) - if localUidErr != nil { - err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr) - return - } - filerUid, filerUidErr := strconv.Atoi(filerUidStr) - if filerUidErr != nil { - err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr) - return - } - localToFiler[uint32(localUid)] = uint32(filerUid) - filerToLocal[uint32(filerUid)] = uint32(localUid) - } - - return -} diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go deleted file mode 100644 index 69d1655ee..000000000 --- a/weed/filesys/meta_cache/meta_cache.go +++ /dev/null @@ -1,146 +0,0 @@ -package meta_cache - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filer/leveldb" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" - "os" -) - -// need to have logic similar to FilerStoreWrapper -// e.g. fill fileId field for chunks - -type MetaCache struct { - localStore filer.VirtualFilerStore - // sync.RWMutex - visitedBoundary *bounded_tree.BoundedTree - uidGidMapper *UidGidMapper - invalidateFunc func(util.FullPath) -} - -func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache { - return &MetaCache{ - localStore: openMetaStore(dbFolder), - visitedBoundary: bounded_tree.NewBoundedTree(baseDir), - uidGidMapper: uidGidMapper, - invalidateFunc: func(fullpath util.FullPath) { - invalidateFunc(fullpath) - }, - } -} - -func openMetaStore(dbFolder string) filer.VirtualFilerStore { - - os.RemoveAll(dbFolder) - os.MkdirAll(dbFolder, 0755) - - store := &leveldb.LevelDBStore{} - config := &cacheConfig{ - dir: dbFolder, - } - - if err := store.Initialize(config, ""); err != nil { - glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) - } - - return filer.NewFilerStoreWrapper(store) - -} - -func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - return mc.doInsertEntry(ctx, entry) -} - -func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error { - return mc.localStore.InsertEntry(ctx, entry) -} - -func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - - oldDir, _ := oldPath.DirAndName() - if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { - if oldPath != "" { - if newEntry != nil && oldPath == newEntry.FullPath { - // skip the unnecessary deletion - // leave the update to the following InsertEntry operation - } else { - glog.V(3).Infof("DeleteEntry %s", oldPath) - if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { - return err - } - } - } - } else { - // println("unknown old directory:", oldDir) - } - - if newEntry != nil { - newDir, _ := newEntry.DirAndName() - if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { - glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) - if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { - return err - } - } - } - return nil -} - -func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - return mc.localStore.UpdateEntry(ctx, entry) -} - -func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { - //mc.RLock() - //defer mc.RUnlock() - entry, err = mc.localStore.FindEntry(ctx, fp) - if err != nil { - return nil, err - } - mc.mapIdFromFilerToLocal(entry) - return -} - -func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - //mc.Lock() - //defer mc.Unlock() - return mc.localStore.DeleteEntry(ctx, fp) -} - -func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { - //mc.RLock() - //defer mc.RUnlock() - - if !mc.visitedBoundary.HasVisited(dirPath) { - return fmt.Errorf("unsynchronized dir: %v", dirPath) - } - - _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { - mc.mapIdFromFilerToLocal(entry) - return eachEntryFunc(entry) - }) - if err != nil { - return err - } - return err -} - -func (mc *MetaCache) Shutdown() { - //mc.Lock() - //defer mc.Unlock() - mc.localStore.Shutdown() -} - -func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) { - entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid) -} diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go deleted file mode 100644 index 07098bf6b..000000000 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ /dev/null @@ -1,47 +0,0 @@ -package meta_cache - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { - - return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { - - glog.V(4).Infof("ReadDirAllEntries %s ...", path) - - util.Retry("ReadDirAllEntries", func() error { - err = filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { - entry := filer.FromPbEntry(string(path), pbEntry) - if IsHiddenSystemEntry(string(path), entry.Name()) { - return nil - } - if err := mc.doInsertEntry(context.Background(), entry); err != nil { - glog.V(0).Infof("read %s: %v", entry.FullPath, err) - return err - } - if entry.IsDirectory() { - childDirectories = append(childDirectories, entry.Name()) - } - return nil - }) - return err - }) - - if err != nil { - err = fmt.Errorf("list %s: %v", path, err) - } - - return - }) -} - -func IsHiddenSystemEntry(dir, name string) bool { - return dir == "/" && (name == "topics" || name == "etc") -} diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go deleted file mode 100644 index 2099cf1f8..000000000 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ /dev/null @@ -1,68 +0,0 @@ -package meta_cache - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { - - processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { - message := resp.EventNotification - - for _, sig := range message.Signatures { - if sig == selfSignature && selfSignature != 0 { - return nil - } - } - - dir := resp.Directory - var oldPath util.FullPath - var newEntry *filer.Entry - if message.OldEntry != nil { - oldPath = util.NewFullPath(dir, message.OldEntry.Name) - glog.V(4).Infof("deleting %v", oldPath) - } - - if message.NewEntry != nil { - if message.NewParentPath != "" { - dir = message.NewParentPath - } - key := util.NewFullPath(dir, message.NewEntry.Name) - glog.V(4).Infof("creating %v", key) - newEntry = filer.FromPbEntry(dir, message.NewEntry) - } - err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) - if err == nil { - if message.OldEntry != nil && message.NewEntry != nil { - oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) - mc.invalidateFunc(oldKey) - if message.OldEntry.Name != message.NewEntry.Name { - newKey := util.NewFullPath(dir, message.NewEntry.Name) - mc.invalidateFunc(newKey) - } - } else if message.OldEntry == nil && message.NewEntry != nil { - // no need to invaalidate - } else if message.OldEntry != nil && message.NewEntry == nil { - oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) - mc.invalidateFunc(oldKey) - } - } - - return err - - } - - util.RetryForever("followMetaUpdates", func() error { - return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true) - }, func(err error) bool { - glog.Errorf("follow metadata updates: %v", err) - return true - }) - - return nil -} diff --git a/weed/filesys/unimplemented.go b/weed/filesys/unimplemented.go deleted file mode 100644 index 5c2dcf0e1..000000000 --- a/weed/filesys/unimplemented.go +++ /dev/null @@ -1,22 +0,0 @@ -package filesys - -import ( - "context" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" -) - -// https://github.com/bazil/fuse/issues/130 - -var _ = fs.NodeAccesser(&Dir{}) - -func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error { - return fuse.ENOSYS -} - -var _ = fs.NodeAccesser(&File{}) - -func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error { - return fuse.ENOSYS -} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go deleted file mode 100644 index 84d4bdfa2..000000000 --- a/weed/filesys/wfs.go +++ /dev/null @@ -1,303 +0,0 @@ -package filesys - -import ( - "context" - "fmt" - "math" - "math/rand" - "os" - "path" - "path/filepath" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/wdclient" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/util/grace" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" -) - -type Option struct { - MountDirectory string - FilerAddresses []string - filerIndex int - FilerGrpcAddresses []string - GrpcDialOption grpc.DialOption - FilerMountRootPath string - Collection string - Replication string - TtlSec int32 - DiskType types.DiskType - ChunkSizeLimit int64 - ConcurrentWriters int - CacheDir string - CacheSizeMB int64 - DataCenter string - Umask os.FileMode - - MountUid uint32 - MountGid uint32 - MountMode os.FileMode - MountCtime time.Time - MountMtime time.Time - MountParentInode uint64 - - VolumeServerAccess string // how to access volume servers - Cipher bool // whether encrypt data on volume server - UidGidMapper *meta_cache.UidGidMapper - - uniqueCacheDir string - uniqueCacheTempPageDir string -} - -var _ = fs.FS(&WFS{}) -var _ = fs.FSStatfser(&WFS{}) - -type WFS struct { - option *Option - - // contains all open handles, protected by handlesLock - handlesLock sync.Mutex - handles map[uint64]*FileHandle - - bufPool sync.Pool - - stats statsCache - - root fs.Node - fsNodeCache *FsCache - - chunkCache *chunk_cache.TieredChunkCache - metaCache *meta_cache.MetaCache - signature int32 - - // throttle writers - concurrentWriters *util.LimitedConcurrentExecutor - Server *fs.Server -} -type statsCache struct { - filer_pb.StatisticsResponse - lastChecked int64 // unix time in seconds -} - -func NewSeaweedFileSystem(option *Option) *WFS { - wfs := &WFS{ - option: option, - handles: make(map[uint64]*FileHandle), - bufPool: sync.Pool{ - New: func() interface{} { - return make([]byte, option.ChunkSizeLimit) - }, - }, - signature: util.RandomInt32(), - } - wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses)) - wfs.option.setupUniqueCacheDirectory() - if option.CacheSizeMB > 0 { - wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024) - } - - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { - - fsNode := NodeWithId(filePath.AsInode()) - if err := wfs.Server.InvalidateNodeData(fsNode); err != nil { - glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err) - } - - dir, name := filePath.DirAndName() - parent := NodeWithId(util.FullPath(dir).AsInode()) - if dir == option.FilerMountRootPath { - parent = NodeWithId(1) - } - if err := wfs.Server.InvalidateEntry(parent, name); err != nil { - glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err) - } - }) - grace.OnInterrupt(func() { - wfs.metaCache.Shutdown() - }) - - wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1} - wfs.fsNodeCache = newFsCache(wfs.root) - - if wfs.option.ConcurrentWriters > 0 { - wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) - } - - return wfs -} - -func (wfs *WFS) StartBackgroundTasks() { - startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) -} - -func (wfs *WFS) Root() (fs.Node, error) { - return wfs.root, nil -} - -func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) { - - fullpath := file.fullpath() - glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) - - inodeId := file.Id() - - wfs.handlesLock.Lock() - existingHandle, found := wfs.handles[inodeId] - if found && existingHandle != nil && existingHandle.f.isOpen > 0 { - existingHandle.f.isOpen++ - wfs.handlesLock.Unlock() - existingHandle.dirtyPages.SetWriteOnly(writeOnly) - glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen) - return existingHandle - } - wfs.handlesLock.Unlock() - - entry, _ := file.maybeLoadEntry(context.Background()) - file.entry = entry - fileHandle = newFileHandle(file, uid, gid, writeOnly) - - wfs.handlesLock.Lock() - file.isOpen++ - wfs.handles[inodeId] = fileHandle - wfs.handlesLock.Unlock() - fileHandle.handle = inodeId - - glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen) - return -} - -func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { - wfs.handlesLock.Lock() - defer wfs.handlesLock.Unlock() - - glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - - delete(wfs.handles, uint64(handleId)) - - return -} - -// Statfs is called to obtain file system metadata. Implements fuse.FSStatfser -func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error { - - glog.V(4).Infof("reading fs stats: %+v", req) - - if wfs.stats.lastChecked < time.Now().Unix()-20 { - - err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.StatisticsRequest{ - Collection: wfs.option.Collection, - Replication: wfs.option.Replication, - Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), - DiskType: string(wfs.option.DiskType), - } - - glog.V(4).Infof("reading filer stats: %+v", request) - resp, err := client.Statistics(context.Background(), request) - if err != nil { - glog.V(0).Infof("reading filer stats %v: %v", request, err) - return err - } - glog.V(4).Infof("read filer stats: %+v", resp) - - wfs.stats.TotalSize = resp.TotalSize - wfs.stats.UsedSize = resp.UsedSize - wfs.stats.FileCount = resp.FileCount - wfs.stats.lastChecked = time.Now().Unix() - - return nil - }) - if err != nil { - glog.V(0).Infof("filer Statistics: %v", err) - return err - } - } - - totalDiskSize := wfs.stats.TotalSize - usedDiskSize := wfs.stats.UsedSize - actualFileCount := wfs.stats.FileCount - - // Compute the total number of available blocks - resp.Blocks = totalDiskSize / blockSize - - // Compute the number of used blocks - numBlocks := uint64(usedDiskSize / blockSize) - - // Report the number of free and available blocks for the block size - resp.Bfree = resp.Blocks - numBlocks - resp.Bavail = resp.Blocks - numBlocks - resp.Bsize = uint32(blockSize) - - // Report the total number of possible files in the file system (and those free) - resp.Files = math.MaxInt64 - resp.Ffree = math.MaxInt64 - actualFileCount - - // Report the maximum length of a name and the minimum fragment size - resp.Namelen = 1024 - resp.Frsize = uint32(blockSize) - - return nil -} - -func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) { - if entry.Attributes == nil { - return - } - entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid) -} -func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { - if entry.Attributes == nil { - return - } - entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) -} - -func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { - if wfs.option.VolumeServerAccess == "filerProxy" { - return func(fileId string) (targetUrls []string, err error) { - return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil - } - } - return filer.LookupFn(wfs) -} -func (wfs *WFS) getCurrentFiler() string { - return wfs.option.FilerAddresses[wfs.option.filerIndex] -} - -func (option *Option) setupUniqueCacheDirectory() { - cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8] - option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId) - option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw") - os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask) -} - -func (option *Option) getTempFilePageDir() string { - return option.uniqueCacheTempPageDir -} -func (option *Option) getUniqueCacheDir() string { - return option.uniqueCacheDir -} - -type NodeWithId uint64 - -func (n NodeWithId) Id() uint64 { - return uint64(n) -} -func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error { - return nil -} diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go deleted file mode 100644 index 95ebdb9b8..000000000 --- a/weed/filesys/wfs_filer_client.go +++ /dev/null @@ -1,51 +0,0 @@ -package filesys - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -var _ = filer_pb.FilerClient(&WFS{}) - -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { - - return util.Retry("filer grpc", func() error { - - i := wfs.option.filerIndex - n := len(wfs.option.FilerGrpcAddresses) - for x := 0; x < n; x++ { - - filerGrpcAddress := wfs.option.FilerGrpcAddresses[i] - err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, filerGrpcAddress, wfs.option.GrpcDialOption) - - if err != nil { - glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) - } else { - wfs.option.filerIndex = i - return nil - } - - i++ - if i >= n { - i = 0 - } - - } - return err - }) - -} - -func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { - if wfs.option.VolumeServerAccess == "publicUrl" { - return location.PublicUrl - } - return location.Url -} diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go deleted file mode 100644 index 42c13cfd0..000000000 --- a/weed/filesys/wfs_write.go +++ /dev/null @@ -1,78 +0,0 @@ -package filesys - -import ( - "context" - "fmt" - "io" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.SaveDataAsChunkFunctionType { - - return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { - var fileId, host string - var auth security.EncodedJwt - - if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - return util.Retry("assignVolume", func() error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: wfs.option.Replication, - Collection: wfs.option.Collection, - TtlSec: wfs.option.TtlSec, - DiskType: string(wfs.option.DiskType), - DataCenter: wfs.option.DataCenter, - Path: string(fullPath), - } - - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth) - loc := &filer_pb.Location{ - Url: resp.Url, - PublicUrl: resp.PublicUrl, - } - host = wfs.AdjustedUrl(loc) - collection, replication = resp.Collection, resp.Replication - - return nil - }) - }); err != nil { - return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) - } - - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if wfs.option.VolumeServerAccess == "filerProxy" { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) - } - uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) - if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) - return nil, "", "", fmt.Errorf("upload data: %v", err) - } - if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) - return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) - } - - if !writeOnly { - wfs.chunkCache.SetChunk(fileId, data) - } - - chunk = uploadResult.ToPbFileChunk(fileId, offset) - return chunk, collection, replication, nil - } -} diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go deleted file mode 100644 index 4b2ee0064..000000000 --- a/weed/filesys/xattr.go +++ /dev/null @@ -1,138 +0,0 @@ -package filesys - -import ( - "context" - - "github.com/seaweedfs/fuse" - - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - - if entry == nil { - return fuse.ErrNoXattr - } - if entry.Extended == nil { - return fuse.ErrNoXattr - } - data, found := entry.Extended[req.Name] - if !found { - return fuse.ErrNoXattr - } - if req.Position < uint32(len(data)) { - size := req.Size - if req.Position+size >= uint32(len(data)) { - size = uint32(len(data)) - req.Position - } - if size == 0 { - resp.Xattr = data[req.Position:] - } else { - resp.Xattr = data[req.Position : req.Position+size] - } - } - - return nil - -} - -func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error { - - if entry == nil { - return fuse.EIO - } - - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - data, _ := entry.Extended[req.Name] - - newData := make([]byte, int(req.Position)+len(req.Xattr)) - - copy(newData, data) - - copy(newData[int(req.Position):], req.Xattr) - - entry.Extended[req.Name] = newData - - return nil - -} - -func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error { - - if entry == nil { - return fuse.ErrNoXattr - } - - if entry.Extended == nil { - return fuse.ErrNoXattr - } - - _, found := entry.Extended[req.Name] - - if !found { - return fuse.ErrNoXattr - } - - delete(entry.Extended, req.Name) - - return nil - -} - -func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - - if entry == nil { - return fuse.EIO - } - - for k := range entry.Extended { - resp.Append(k) - } - - size := req.Size - if req.Position+size >= uint32(len(resp.Xattr)) { - size = uint32(len(resp.Xattr)) - req.Position - } - - if size == 0 { - resp.Xattr = resp.Xattr[req.Position:] - } else { - resp.Xattr = resp.Xattr[req.Position : req.Position+size] - } - - return nil - -} - -func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { - - fullpath := util.NewFullPath(dir, name) - // glog.V(3).Infof("read entry cache miss %s", fullpath) - - // return a valid entry for the mount root - if string(fullpath) == wfs.option.FilerMountRootPath { - return &filer_pb.Entry{ - Name: name, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: wfs.option.MountMtime.Unix(), - FileMode: uint32(wfs.option.MountMode), - Uid: wfs.option.MountUid, - Gid: wfs.option.MountGid, - Crtime: wfs.option.MountCtime.Unix(), - }, - }, nil - } - - // read from async meta cache - meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) - cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - return cachedEntry.ToProtoEntry(), cacheErr -} |
