diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 117 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 108 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 14 | ||||
| -rw-r--r-- | weed/filesys/file.go | 14 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 153 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 22 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 87 |
8 files changed, 147 insertions, 370 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index fae289217..0e9e92e16 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -4,7 +4,6 @@ import ( "context" "os" "path" - "path/filepath" "time" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -29,15 +28,13 @@ var _ = fs.NodeRemover(&Dir{}) var _ = fs.NodeRenamer(&Dir{}) var _ = fs.NodeSetattrer(&Dir{}) -func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { +func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { // https://github.com/bazil/fuse/issues/196 attr.Valid = time.Second if dir.Path == dir.wfs.option.FilerMountRootPath { - attr.Uid = dir.wfs.option.MountUid - attr.Gid = dir.wfs.option.MountGid - attr.Mode = dir.wfs.option.MountMode + dir.setRootDirAttributes(attr) return nil } @@ -54,40 +51,14 @@ func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { return nil } - parent, name := filepath.Split(dir.Path) - - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: parent, - Name: name, - } - - glog.V(1).Infof("read dir %s attr: %v", dir.Path, request) - resp, err := client.LookupDirectoryEntry(context, request) - if err != nil { - if err == filer2.ErrNotFound { - return nil - } - glog.V(0).Infof("read dir %s attr %v: %v", dir.Path, request, err) - return err - } - - if resp.Entry != nil { - dir.attributes = resp.Entry.Attributes - } - - // dir.wfs.listDirectoryEntriesCache.Set(dir.Path, resp.Entry, dir.wfs.option.EntryCacheTtl) - - return nil - }) - + entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path) if err != nil { + glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err) return err } + dir.attributes = entry.Attributes - // glog.V(1).Infof("dir %s: %v", dir.Path, attributes) - // glog.V(1).Infof("dir %s permission: %v", dir.Path, os.FileMode(attributes.FileMode)) + glog.V(2).Infof("dir %s: %v perm: %v", dir.Path, dir.attributes, os.FileMode(dir.attributes.FileMode)) attr.Mode = os.FileMode(dir.attributes.FileMode) | os.ModeDir @@ -99,6 +70,16 @@ func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { return nil } +func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { + attr.Uid = dir.wfs.option.MountUid + attr.Gid = dir.wfs.option.MountGid + attr.Mode = dir.wfs.option.MountMode + attr.Crtime = dir.wfs.option.MountCtime + attr.Ctime = dir.wfs.option.MountCtime + attr.Mtime = dir.wfs.option.MountMtime + attr.Atime = dir.wfs.option.MountMtime +} + func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File { return &File{ Name: name, @@ -132,7 +113,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, glog.V(1).Infof("create: %v", request) if request.Entry.IsDirectory { - if err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if _, err := client.CreateEntry(ctx, request); err != nil { glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err) return fuse.EIO @@ -155,7 +136,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: dir.Path, @@ -192,33 +173,18 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { var entry *filer_pb.Entry + fullFilePath := path.Join(dir.Path, req.Name) - item := dir.wfs.listDirectoryEntriesCache.Get(path.Join(dir.Path, req.Name)) + item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath) if item != nil && !item.Expired() { entry = item.Value().(*filer_pb.Entry) } if entry == nil { - err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } - - glog.V(4).Infof("lookup directory entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } - - entry = resp.Entry - - // dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, dir.wfs.option.EntryCacheTtl) - - return nil - }) + entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath) + if err != nil { + return nil, err + } } if entry != nil { @@ -243,7 +209,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { paginationLimit := 1024 remaining := dir.wfs.option.DirListingLimit @@ -305,33 +271,14 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { - var entry *filer_pb.Entry - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } - - glog.V(4).Infof("lookup to-be-removed entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } - - entry = resp.Entry - - return nil - }) - + entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name)) if err != nil { return err } - dir.wfs.deleteFileChunks(entry.Chunks) + dir.wfs.deleteFileChunks(ctx, entry.Chunks) - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -355,7 +302,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -379,6 +326,10 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { + if dir.attributes == nil { + return nil + } + glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle) if req.Valid.Mode() { dir.attributes.FileMode = uint32(req.Mode) @@ -397,7 +348,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus } parentDir, name := filer2.FullPath(dir.Path).DirAndName() - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 3b3735369..92cf04d58 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -35,7 +35,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, }, } - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if _, err := client.CreateEntry(ctx, request); err != nil { glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err) return fuse.EIO diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index d29281f35..e72a15758 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,118 +2,32 @@ package filesys import ( "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" - "math" - "path/filepath" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { newDir := newDirectory.(*Dir) - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - // find existing entry - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.OldName, + request := &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: dir.Path, + OldName: req.OldName, + NewDirectory: newDir.Path, + NewName: req.NewName, } - glog.V(4).Infof("find existing directory entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) + _, err := client.AtomicRenameEntry(ctx, request) if err != nil { - glog.V(3).Infof("renaming find %s/%s: %v", dir.Path, req.OldName, err) - return fuse.ENOENT + return fmt.Errorf("renaming %s/%s => %s/%s: %v", dir.Path, req.OldName, newDir.Path, req.NewName, err) } - entry := resp.Entry + return nil - glog.V(4).Infof("found existing directory entry resp: %+v", resp) - - return moveEntry(ctx, client, dir.Path, entry, newDir.Path, req.NewName) }) } - -func moveEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, oldParent string, entry *filer_pb.Entry, newParent, newName string) error { - if entry.IsDirectory { - currentDirPath := filepath.Join(oldParent, entry.Name) - - lastFileName := "" - includeLastFile := false - limit := math.MaxInt32 - for limit > 0 { - request := &filer_pb.ListEntriesRequest{ - Directory: currentDirPath, - StartFromFileName: lastFileName, - InclusiveStartFrom: includeLastFile, - Limit: 1024, - } - glog.V(4).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - glog.V(0).Infof("list %s: %v", oldParent, err) - return fuse.EIO - } - if len(resp.Entries) == 0 { - break - } - - for _, item := range resp.Entries { - lastFileName = item.Name - err := moveEntry(ctx, client, currentDirPath, item, filepath.Join(newParent, newName), item.Name) - if err != nil { - return err - } - limit-- - } - if len(resp.Entries) < 1024 { - break - } - } - - } - - // add to new directory - { - request := &filer_pb.CreateEntryRequest{ - Directory: newParent, - Entry: &filer_pb.Entry{ - Name: newName, - IsDirectory: entry.IsDirectory, - Attributes: entry.Attributes, - Chunks: entry.Chunks, - }, - } - - glog.V(1).Infof("create new entry: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("renaming create %s/%s: %v", newParent, newName, err) - return fuse.EIO - } - } - - // delete old entry - { - request := &filer_pb.DeleteEntryRequest{ - Directory: oldParent, - Name: entry.Name, - IsDeleteData: false, - } - - glog.V(1).Infof("remove old entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(0).Infof("renaming delete %s/%s: %v", oldParent, entry.Name, err) - return fuse.EIO - } - - } - - return nil - -} diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 696296e62..baee412b2 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "fmt" + "sync" "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "sync" + "github.com/chrislusf/seaweedfs/weed/security" ) type ContinuousDirtyPages struct { @@ -109,7 +110,7 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6 // flush existing if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { if chunk != nil { - glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(4).Infof("%s/%s flush existing [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) chunks = append(chunks, chunk) } } else { @@ -122,7 +123,7 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6 // flush the new page if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) chunks = append(chunks, chunk) } } else { @@ -164,8 +165,9 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { var fileId, host string + var auth security.EncodedJwt - if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -181,7 +183,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte return err } - fileId, host = resp.FileId, resp.Url + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) return nil }); err != nil { @@ -190,7 +192,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "") + uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 4bb169a33..1b359ebbe 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -74,10 +74,6 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return err } - if file.isOpen { - return nil - } - glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) if req.Valid.Size() { @@ -109,7 +105,11 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.entry.Attributes.Mtime = req.Mtime.Unix() } - return file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if file.isOpen { + return nil + } + + return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: file.dir.Path, @@ -144,7 +144,7 @@ func (file *File) maybeLoadAttributes(ctx context.Context) error { file.setEntry(entry) // glog.V(1).Infof("file attr read cached %v attributes", file.Name) } else { - err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: file.Name, @@ -194,6 +194,8 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { newVisibles = t } + glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) + file.entry.Chunks = append(file.entry.Chunks, chunks...) } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 0f6ca1164..ceec50e13 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,17 +3,16 @@ package filesys import ( "context" "fmt" + "mime" + "path" + "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/gabriel-vasile/mimetype" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" - "net/http" - "strings" - "sync" - "time" ) type FileHandle struct { @@ -65,75 +64,14 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size) - var vids []string - for _, chunkView := range chunkViews { - vids = append(vids, volumeId(chunkView.FileId)) - } - - vid2Locations := make(map[string]*filer_pb.Locations) - - err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset) - glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return err - } - - vid2Locations = resp.LocationsMap - - return nil - }) + resp.Data = buff[:totalRead] if err != nil { - glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err) - return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) + glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - var totalRead int64 - var wg sync.WaitGroup - for _, chunkView := range chunkViews { - wg.Add(1) - go func(chunkView *filer2.ChunkView) { - defer wg.Done() - - glog.V(4).Infof("read fh reading chunk: %+v", chunkView) - - locations := vid2Locations[volumeId(chunkView.FileId)] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", chunkView.FileId) - err = fmt.Errorf("failed to locate %s", chunkView.FileId) - return - } - - var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)], - !chunkView.IsFullChunk) - - if err != nil { - - glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err) - - err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) - return - } - - glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) - totalRead += n - - }(chunkView) - } - wg.Wait() - - resp.Data = buff[:totalRead] - return err } @@ -153,7 +91,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f resp.Size = len(req.Data) if req.Offset == 0 { - fh.contentType = http.DetectContentType(req.Data) + // detect mime type + var possibleExt string + fh.contentType, possibleExt = mimetype.Detect(req.Data) + if ext := path.Ext(fh.f.Name); ext != possibleExt { + fh.contentType = mime.TypeByExtension(ext) + } + fh.dirtyMetadata = true } @@ -196,7 +140,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } - return fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType @@ -212,70 +156,25 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { Entry: fh.f.entry, } - //glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks)) - //for i, chunk := range fh.f.entry.Chunks { - // glog.V(4).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) - //} + glog.V(3).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks)) + for i, chunk := range fh.f.entry.Chunks { + glog.V(3).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil - fh.f.wfs.deleteFileChunks(garbages) if _, err := client.CreateEntry(ctx, request); err != nil { + glog.Errorf("update fh: %v", err) return fmt.Errorf("update fh: %v", err) } - return nil - }) -} - -func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error { - - var vids []string - for _, fileId := range fileIds { - vids = append(vids, volumeId(fileId)) - } - - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { - - m := make(map[string]operation.LookupResult) - - glog.V(4).Infof("remove file lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return m, err + fh.f.wfs.deleteFileChunks(ctx, garbages) + for i, chunk := range garbages { + glog.V(3).Infof("garbage %s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - for _, vid := range vids { - lr := operation.LookupResult{ - VolumeId: vid, - Locations: nil, - } - locations := resp.LocationsMap[vid] - for _, loc := range locations.Locations { - lr.Locations = append(lr.Locations, operation.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - }) - } - m[vid] = lr - } - - return m, err - } - - _, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) - - return err -} - -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId + return nil + }) } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 969514a06..9018c36ed 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -19,6 +19,7 @@ import ( type Option struct { FilerGrpcAddress string + GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string Replication string @@ -28,9 +29,11 @@ type Option struct { DirListingLimit int EntryCacheTtl time.Duration - MountUid uint32 - MountGid uint32 - MountMode os.FileMode + MountUid uint32 + MountGid uint32 + MountMode os.FileMode + MountCtime time.Time + MountMtime time.Time } var _ = fs.FS(&WFS{}) @@ -46,8 +49,6 @@ type WFS struct { pathToHandleLock sync.Mutex bufPool sync.Pool - fileIdsDeletionChan chan []string - stats statsCache } type statsCache struct { @@ -65,11 +66,8 @@ func NewSeaweedFileSystem(option *Option) *WFS { return make([]byte, option.ChunkSizeLimit) }, }, - fileIdsDeletionChan: make(chan []string, 32), } - go wfs.loopProcessingDeletion() - return wfs } @@ -77,12 +75,12 @@ func (wfs *WFS) Root() (fs.Node, error) { return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil } -func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, wfs.option.FilerGrpcAddress) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) } @@ -137,7 +135,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index f58ef24f4..6e586b7df 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -2,57 +2,68 @@ package filesys import ( "context" - "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc" ) -func (wfs *WFS) loopProcessingDeletion() { - - ticker := time.NewTicker(2 * time.Second) - - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var fileIds []string - for { - select { - case fids := <-wfs.fileIdsDeletionChan: - fileIds = append(fileIds, fids...) - if len(fileIds) >= 1024 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - } - } - }) - -} - -func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { +func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChunk) { if len(chunks) == 0 { return } var fileIds []string for _, chunk := range chunks { - fileIds = append(fileIds, chunk.FileId) + fileIds = append(fileIds, chunk.GetFileIdString()) } - var async = false - if async { - wfs.fileIdsDeletionChan <- fileIds - return - } - - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - deleteFileIds(context.Background(), client, fileIds) + wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds) return nil }) } + +func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { + + var vids []string + for _, fileId := range fileIds { + vids = append(vids, filer2.VolumeId(fileId)) + } + + lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { + + m := make(map[string]operation.LookupResult) + + glog.V(4).Infof("remove file lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return m, err + } + + for _, vid := range vids { + lr := operation.LookupResult{ + VolumeId: vid, + Locations: nil, + } + locations := resp.LocationsMap[vid] + for _, loc := range locations.Locations { + lr.Locations = append(lr.Locations, operation.Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + }) + } + m[vid] = lr + } + + return m, err + } + + _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) + + return err +} |
