aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go117
-rw-r--r--weed/filesys/dir_link.go2
-rw-r--r--weed/filesys/dir_rename.go108
-rw-r--r--weed/filesys/dirty_page.go14
-rw-r--r--weed/filesys/file.go14
-rw-r--r--weed/filesys/filehandle.go153
-rw-r--r--weed/filesys/wfs.go22
-rw-r--r--weed/filesys/wfs_deletion.go87
8 files changed, 147 insertions, 370 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index fae289217..0e9e92e16 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -4,7 +4,6 @@ import (
"context"
"os"
"path"
- "path/filepath"
"time"
"github.com/chrislusf/seaweedfs/weed/filer2"
@@ -29,15 +28,13 @@ var _ = fs.NodeRemover(&Dir{})
var _ = fs.NodeRenamer(&Dir{})
var _ = fs.NodeSetattrer(&Dir{})
-func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
+func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
// https://github.com/bazil/fuse/issues/196
attr.Valid = time.Second
if dir.Path == dir.wfs.option.FilerMountRootPath {
- attr.Uid = dir.wfs.option.MountUid
- attr.Gid = dir.wfs.option.MountGid
- attr.Mode = dir.wfs.option.MountMode
+ dir.setRootDirAttributes(attr)
return nil
}
@@ -54,40 +51,14 @@ func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
return nil
}
- parent, name := filepath.Split(dir.Path)
-
- err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: parent,
- Name: name,
- }
-
- glog.V(1).Infof("read dir %s attr: %v", dir.Path, request)
- resp, err := client.LookupDirectoryEntry(context, request)
- if err != nil {
- if err == filer2.ErrNotFound {
- return nil
- }
- glog.V(0).Infof("read dir %s attr %v: %v", dir.Path, request, err)
- return err
- }
-
- if resp.Entry != nil {
- dir.attributes = resp.Entry.Attributes
- }
-
- // dir.wfs.listDirectoryEntriesCache.Set(dir.Path, resp.Entry, dir.wfs.option.EntryCacheTtl)
-
- return nil
- })
-
+ entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path)
if err != nil {
+ glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err)
return err
}
+ dir.attributes = entry.Attributes
- // glog.V(1).Infof("dir %s: %v", dir.Path, attributes)
- // glog.V(1).Infof("dir %s permission: %v", dir.Path, os.FileMode(attributes.FileMode))
+ glog.V(2).Infof("dir %s: %v perm: %v", dir.Path, dir.attributes, os.FileMode(dir.attributes.FileMode))
attr.Mode = os.FileMode(dir.attributes.FileMode) | os.ModeDir
@@ -99,6 +70,16 @@ func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
return nil
}
+func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
+ attr.Uid = dir.wfs.option.MountUid
+ attr.Gid = dir.wfs.option.MountGid
+ attr.Mode = dir.wfs.option.MountMode
+ attr.Crtime = dir.wfs.option.MountCtime
+ attr.Ctime = dir.wfs.option.MountCtime
+ attr.Mtime = dir.wfs.option.MountMtime
+ attr.Atime = dir.wfs.option.MountMtime
+}
+
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File {
return &File{
Name: name,
@@ -132,7 +113,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
glog.V(1).Infof("create: %v", request)
if request.Entry.IsDirectory {
- if err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
if _, err := client.CreateEntry(ctx, request); err != nil {
glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err)
return fuse.EIO
@@ -155,7 +136,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
- err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: dir.Path,
@@ -192,33 +173,18 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
var entry *filer_pb.Entry
+ fullFilePath := path.Join(dir.Path, req.Name)
- item := dir.wfs.listDirectoryEntriesCache.Get(path.Join(dir.Path, req.Name))
+ item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath)
if item != nil && !item.Expired() {
entry = item.Value().(*filer_pb.Entry)
}
if entry == nil {
- err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir.Path,
- Name: req.Name,
- }
-
- glog.V(4).Infof("lookup directory entry: %v", request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil {
- // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err)
- return fuse.ENOENT
- }
-
- entry = resp.Entry
-
- // dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, dir.wfs.option.EntryCacheTtl)
-
- return nil
- })
+ entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath)
+ if err != nil {
+ return nil, err
+ }
}
if entry != nil {
@@ -243,7 +209,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
paginationLimit := 1024
remaining := dir.wfs.option.DirListingLimit
@@ -305,33 +271,14 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error {
- var entry *filer_pb.Entry
- err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir.Path,
- Name: req.Name,
- }
-
- glog.V(4).Infof("lookup to-be-removed entry: %v", request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil {
- // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err)
- return fuse.ENOENT
- }
-
- entry = resp.Entry
-
- return nil
- })
-
+ entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name))
if err != nil {
return err
}
- dir.wfs.deleteFileChunks(entry.Chunks)
+ dir.wfs.deleteFileChunks(ctx, entry.Chunks)
- return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: dir.Path,
@@ -355,7 +302,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro
func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error {
- return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: dir.Path,
@@ -379,6 +326,10 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
+ if dir.attributes == nil {
+ return nil
+ }
+
glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle)
if req.Valid.Mode() {
dir.attributes.FileMode = uint32(req.Mode)
@@ -397,7 +348,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
}
parentDir, name := filer2.FullPath(dir.Path).DirAndName()
- return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: parentDir,
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 3b3735369..92cf04d58 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -35,7 +35,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
},
}
- err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
if _, err := client.CreateEntry(ctx, request); err != nil {
glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err)
return fuse.EIO
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index d29281f35..e72a15758 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -2,118 +2,32 @@ package filesys
import (
"context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
- "math"
- "path/filepath"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
newDir := newDirectory.(*Dir)
- return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- // find existing entry
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir.Path,
- Name: req.OldName,
+ request := &filer_pb.AtomicRenameEntryRequest{
+ OldDirectory: dir.Path,
+ OldName: req.OldName,
+ NewDirectory: newDir.Path,
+ NewName: req.NewName,
}
- glog.V(4).Infof("find existing directory entry: %v", request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
+ _, err := client.AtomicRenameEntry(ctx, request)
if err != nil {
- glog.V(3).Infof("renaming find %s/%s: %v", dir.Path, req.OldName, err)
- return fuse.ENOENT
+ return fmt.Errorf("renaming %s/%s => %s/%s: %v", dir.Path, req.OldName, newDir.Path, req.NewName, err)
}
- entry := resp.Entry
+ return nil
- glog.V(4).Infof("found existing directory entry resp: %+v", resp)
-
- return moveEntry(ctx, client, dir.Path, entry, newDir.Path, req.NewName)
})
}
-
-func moveEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, oldParent string, entry *filer_pb.Entry, newParent, newName string) error {
- if entry.IsDirectory {
- currentDirPath := filepath.Join(oldParent, entry.Name)
-
- lastFileName := ""
- includeLastFile := false
- limit := math.MaxInt32
- for limit > 0 {
- request := &filer_pb.ListEntriesRequest{
- Directory: currentDirPath,
- StartFromFileName: lastFileName,
- InclusiveStartFrom: includeLastFile,
- Limit: 1024,
- }
- glog.V(4).Infof("read directory: %v", request)
- resp, err := client.ListEntries(ctx, request)
- if err != nil {
- glog.V(0).Infof("list %s: %v", oldParent, err)
- return fuse.EIO
- }
- if len(resp.Entries) == 0 {
- break
- }
-
- for _, item := range resp.Entries {
- lastFileName = item.Name
- err := moveEntry(ctx, client, currentDirPath, item, filepath.Join(newParent, newName), item.Name)
- if err != nil {
- return err
- }
- limit--
- }
- if len(resp.Entries) < 1024 {
- break
- }
- }
-
- }
-
- // add to new directory
- {
- request := &filer_pb.CreateEntryRequest{
- Directory: newParent,
- Entry: &filer_pb.Entry{
- Name: newName,
- IsDirectory: entry.IsDirectory,
- Attributes: entry.Attributes,
- Chunks: entry.Chunks,
- },
- }
-
- glog.V(1).Infof("create new entry: %v", request)
- if _, err := client.CreateEntry(ctx, request); err != nil {
- glog.V(0).Infof("renaming create %s/%s: %v", newParent, newName, err)
- return fuse.EIO
- }
- }
-
- // delete old entry
- {
- request := &filer_pb.DeleteEntryRequest{
- Directory: oldParent,
- Name: entry.Name,
- IsDeleteData: false,
- }
-
- glog.V(1).Infof("remove old entry: %v", request)
- _, err := client.DeleteEntry(ctx, request)
- if err != nil {
- glog.V(0).Infof("renaming delete %s/%s: %v", oldParent, entry.Name, err)
- return fuse.EIO
- }
-
- }
-
- return nil
-
-}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 696296e62..baee412b2 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -4,13 +4,14 @@ import (
"bytes"
"context"
"fmt"
+ "sync"
"sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "sync"
+ "github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
@@ -109,7 +110,7 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6
// flush existing
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
if chunk != nil {
- glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(4).Infof("%s/%s flush existing [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
chunks = append(chunks, chunk)
}
} else {
@@ -122,7 +123,7 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6
// flush the new page
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
if chunk != nil {
- glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
chunks = append(chunks, chunk)
}
} else {
@@ -164,8 +165,9 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
var fileId, host string
+ var auth security.EncodedJwt
- if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -181,7 +183,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
return err
}
- fileId, host = resp.FileId, resp.Url
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
return nil
}); err != nil {
@@ -190,7 +192,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
+ uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload data: %v", err)
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 4bb169a33..1b359ebbe 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -74,10 +74,6 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
return err
}
- if file.isOpen {
- return nil
- }
-
glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes)
if req.Valid.Size() {
@@ -109,7 +105,11 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.entry.Attributes.Mtime = req.Mtime.Unix()
}
- return file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if file.isOpen {
+ return nil
+ }
+
+ return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: file.dir.Path,
@@ -144,7 +144,7 @@ func (file *File) maybeLoadAttributes(ctx context.Context) error {
file.setEntry(entry)
// glog.V(1).Infof("file attr read cached %v attributes", file.Name)
} else {
- err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: file.Name,
@@ -194,6 +194,8 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
newVisibles = t
}
+ glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
+
file.entry.Chunks = append(file.entry.Chunks, chunks...)
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 0f6ca1164..ceec50e13 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -3,17 +3,16 @@ package filesys
import (
"context"
"fmt"
+ "mime"
+ "path"
+ "time"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/gabriel-vasile/mimetype"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
- "net/http"
- "strings"
- "sync"
- "time"
)
type FileHandle struct {
@@ -65,75 +64,14 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size)
- var vids []string
- for _, chunkView := range chunkViews {
- vids = append(vids, volumeId(chunkView.FileId))
- }
-
- vid2Locations := make(map[string]*filer_pb.Locations)
-
- err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset)
- glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
- VolumeIds: vids,
- })
- if err != nil {
- return err
- }
-
- vid2Locations = resp.LocationsMap
-
- return nil
- })
+ resp.Data = buff[:totalRead]
if err != nil {
- glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err)
- return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
+ glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
- var totalRead int64
- var wg sync.WaitGroup
- for _, chunkView := range chunkViews {
- wg.Add(1)
- go func(chunkView *filer2.ChunkView) {
- defer wg.Done()
-
- glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
-
- locations := vid2Locations[volumeId(chunkView.FileId)]
- if locations == nil || len(locations.Locations) == 0 {
- glog.V(0).Infof("failed to locate %s", chunkView.FileId)
- err = fmt.Errorf("failed to locate %s", chunkView.FileId)
- return
- }
-
- var n int64
- n, err = util.ReadUrl(
- fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
- chunkView.Offset,
- int(chunkView.Size),
- buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)],
- !chunkView.IsFullChunk)
-
- if err != nil {
-
- glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err)
-
- err = fmt.Errorf("failed to read http://%s/%s: %v",
- locations.Locations[0].Url, chunkView.FileId, err)
- return
- }
-
- glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
- totalRead += n
-
- }(chunkView)
- }
- wg.Wait()
-
- resp.Data = buff[:totalRead]
-
return err
}
@@ -153,7 +91,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
resp.Size = len(req.Data)
if req.Offset == 0 {
- fh.contentType = http.DetectContentType(req.Data)
+ // detect mime type
+ var possibleExt string
+ fh.contentType, possibleExt = mimetype.Detect(req.Data)
+ if ext := path.Ext(fh.f.Name); ext != possibleExt {
+ fh.contentType = mime.TypeByExtension(ext)
+ }
+
fh.dirtyMetadata = true
}
@@ -196,7 +140,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return nil
}
- return fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
@@ -212,70 +156,25 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
Entry: fh.f.entry,
}
- //glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks))
- //for i, chunk := range fh.f.entry.Chunks {
- // glog.V(4).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
- //}
+ glog.V(3).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks))
+ for i, chunk := range fh.f.entry.Chunks {
+ glog.V(3).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ }
chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil
- fh.f.wfs.deleteFileChunks(garbages)
if _, err := client.CreateEntry(ctx, request); err != nil {
+ glog.Errorf("update fh: %v", err)
return fmt.Errorf("update fh: %v", err)
}
- return nil
- })
-}
-
-func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error {
-
- var vids []string
- for _, fileId := range fileIds {
- vids = append(vids, volumeId(fileId))
- }
-
- lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
-
- m := make(map[string]operation.LookupResult)
-
- glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
- VolumeIds: vids,
- })
- if err != nil {
- return m, err
+ fh.f.wfs.deleteFileChunks(ctx, garbages)
+ for i, chunk := range garbages {
+ glog.V(3).Infof("garbage %s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- for _, vid := range vids {
- lr := operation.LookupResult{
- VolumeId: vid,
- Locations: nil,
- }
- locations := resp.LocationsMap[vid]
- for _, loc := range locations.Locations {
- lr.Locations = append(lr.Locations, operation.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
- })
- }
- m[vid] = lr
- }
-
- return m, err
- }
-
- _, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
-
- return err
-}
-
-func volumeId(fileId string) string {
- lastCommaIndex := strings.LastIndex(fileId, ",")
- if lastCommaIndex > 0 {
- return fileId[:lastCommaIndex]
- }
- return fileId
+ return nil
+ })
}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 969514a06..9018c36ed 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -19,6 +19,7 @@ import (
type Option struct {
FilerGrpcAddress string
+ GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
@@ -28,9 +29,11 @@ type Option struct {
DirListingLimit int
EntryCacheTtl time.Duration
- MountUid uint32
- MountGid uint32
- MountMode os.FileMode
+ MountUid uint32
+ MountGid uint32
+ MountMode os.FileMode
+ MountCtime time.Time
+ MountMtime time.Time
}
var _ = fs.FS(&WFS{})
@@ -46,8 +49,6 @@ type WFS struct {
pathToHandleLock sync.Mutex
bufPool sync.Pool
- fileIdsDeletionChan chan []string
-
stats statsCache
}
type statsCache struct {
@@ -65,11 +66,8 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return make([]byte, option.ChunkSizeLimit)
},
},
- fileIdsDeletionChan: make(chan []string, 32),
}
- go wfs.loopProcessingDeletion()
-
return wfs
}
@@ -77,12 +75,12 @@ func (wfs *WFS) Root() (fs.Node, error) {
return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
}
-func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, wfs.option.FilerGrpcAddress)
+ }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
}
@@ -137,7 +135,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
if wfs.stats.lastChecked < time.Now().Unix()-20 {
- err := wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index f58ef24f4..6e586b7df 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -2,57 +2,68 @@ package filesys
import (
"context"
- "time"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "google.golang.org/grpc"
)
-func (wfs *WFS) loopProcessingDeletion() {
-
- ticker := time.NewTicker(2 * time.Second)
-
- wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- var fileIds []string
- for {
- select {
- case fids := <-wfs.fileIdsDeletionChan:
- fileIds = append(fileIds, fids...)
- if len(fileIds) >= 1024 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- deleteFileIds(context.Background(), client, fileIds)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- deleteFileIds(context.Background(), client, fileIds)
- fileIds = fileIds[:0]
- }
- }
- }
- })
-
-}
-
-func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
+func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
if len(chunks) == 0 {
return
}
var fileIds []string
for _, chunk := range chunks {
- fileIds = append(fileIds, chunk.FileId)
+ fileIds = append(fileIds, chunk.GetFileIdString())
}
- var async = false
- if async {
- wfs.fileIdsDeletionChan <- fileIds
- return
- }
-
- wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- deleteFileIds(context.Background(), client, fileIds)
+ wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds)
return nil
})
}
+
+func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
+
+ var vids []string
+ for _, fileId := range fileIds {
+ vids = append(vids, filer2.VolumeId(fileId))
+ }
+
+ lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
+
+ m := make(map[string]operation.LookupResult)
+
+ glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: vids,
+ })
+ if err != nil {
+ return m, err
+ }
+
+ for _, vid := range vids {
+ lr := operation.LookupResult{
+ VolumeId: vid,
+ Locations: nil,
+ }
+ locations := resp.LocationsMap[vid]
+ for _, loc := range locations.Locations {
+ lr.Locations = append(lr.Locations, operation.Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ })
+ }
+ m[vid] = lr
+ }
+
+ return m, err
+ }
+
+ _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
+
+ return err
+}