aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go159
-rw-r--r--weed/filesys/dirty_page.go165
-rw-r--r--weed/filesys/file.go145
-rw-r--r--weed/filesys/filehandle.go219
-rw-r--r--weed/filesys/wfs.go17
5 files changed, 648 insertions, 57 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index c1c6afe9c..bf4eda936 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -7,11 +7,12 @@ import (
"path"
"sync"
- "bazil.org/fuse/fs"
"bazil.org/fuse"
- "github.com/chrislusf/seaweedfs/weed/filer"
+ "bazil.org/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "path/filepath"
+ "time"
)
type Dir struct {
@@ -21,21 +22,156 @@ type Dir struct {
wfs *WFS
}
+var _ = fs.Node(&Dir{})
+var _ = fs.NodeCreater(&Dir{})
+var _ = fs.NodeMkdirer(&Dir{})
+var _ = fs.NodeStringLookuper(&Dir{})
+var _ = fs.HandleReadDirAller(&Dir{})
+var _ = fs.NodeRemover(&Dir{})
+
func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
- attr.Mode = os.ModeDir | 0777
+
+ if dir.Path == "/" {
+ attr.Valid = time.Second
+ attr.Mode = os.ModeDir | 0777
+ return nil
+ }
+
+ parent, name := filepath.Split(dir.Path)
+
+ var attributes *filer_pb.FuseAttributes
+
+ err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.GetEntryAttributesRequest{
+ Name: name,
+ ParentDir: parent,
+ }
+
+ glog.V(1).Infof("read dir attr: %v", request)
+ resp, err := client.GetEntryAttributes(context, request)
+ if err != nil {
+ glog.V(0).Infof("read dir attr %v: %v", request, err)
+ return err
+ }
+
+ attributes = resp.Attributes
+
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // glog.V(1).Infof("dir %s: %v", dir.Path, attributes)
+ // glog.V(1).Infof("dir %s permission: %v", dir.Path, os.FileMode(attributes.FileMode))
+
+ attr.Mode = os.FileMode(attributes.FileMode) | os.ModeDir
+ if dir.Path == "/" && attributes.FileMode == 0 {
+ attr.Valid = time.Second
+ }
+
+ attr.Mtime = time.Unix(attributes.Mtime, 0)
+ attr.Ctime = time.Unix(attributes.Mtime, 0)
+ attr.Gid = attributes.Gid
+ attr.Uid = attributes.Uid
+
return nil
}
+func (dir *Dir) newFile(name string, chunks []*filer_pb.FileChunk) *File {
+ return &File{
+ Name: name,
+ dir: dir,
+ wfs: dir.wfs,
+ // attributes: &filer_pb.FuseAttributes{},
+ Chunks: chunks,
+ }
+}
+
+func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
+ resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
+
+ err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.CreateEntryRequest{
+ Directory: dir.Path,
+ Entry: &filer_pb.Entry{
+ Name: req.Name,
+ IsDirectory: req.Mode&os.ModeDir > 0,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(req.Mode),
+ Uid: req.Uid,
+ Gid: req.Gid,
+ },
+ },
+ }
+
+ glog.V(1).Infof("create: %v", request)
+ if _, err := client.CreateEntry(ctx, request); err != nil {
+ return fmt.Errorf("create file: %v", err)
+ }
+
+ return nil
+ })
+
+ if err == nil {
+ file := dir.newFile(req.Name, nil)
+ dir.NodeMap[req.Name] = file
+ file.isOpen = true
+ return file, &FileHandle{
+ f: file,
+ dirtyPages: newDirtyPages(file),
+ RequestId: req.Header.ID,
+ NodeId: req.Header.Node,
+ Uid: req.Uid,
+ Gid: req.Gid,
+ }, nil
+ }
+
+ return nil, nil, err
+}
+
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
dir.NodeMapLock.Lock()
defer dir.NodeMapLock.Unlock()
- fmt.Printf("mkdir %+v\n", req)
+ err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.CreateEntryRequest{
+ Directory: dir.Path,
+ Entry: &filer_pb.Entry{
+ Name: req.Name,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(req.Mode),
+ Uid: req.Uid,
+ Gid: req.Gid,
+ },
+ },
+ }
+
+ glog.V(1).Infof("mkdir: %v", request)
+ if _, err := client.CreateEntry(ctx, request); err != nil {
+ glog.V(0).Infof("mkdir %v: %v", request, err)
+ return fmt.Errorf("make dir: %v", err)
+ }
+
+ return nil
+ })
- node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs}
- dir.NodeMap[req.Name] = node
+ if err == nil {
+ node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs}
+ dir.NodeMap[req.Name] = node
+ return node, nil
+ }
- return node, nil
+ return nil, err
}
func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err error) {
@@ -59,7 +195,7 @@ func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err erro
Name: name,
}
- glog.V(1).Infof("lookup directory entry: %v", request)
+ glog.V(4).Infof("lookup directory entry: %v", request)
resp, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
return err
@@ -74,13 +210,13 @@ func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err erro
if entry.IsDirectory {
node = &Dir{Path: path.Join(dir.Path, name), wfs: dir.wfs}
} else {
- node = &File{FileId: filer.FileId(entry.FileId), Name: name, wfs: dir.wfs}
+ node = dir.newFile(name, entry.Chunks)
}
dir.NodeMap[name] = node
return node, nil
}
- return nil, err
+ return nil, fuse.ENOENT
}
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
@@ -91,7 +227,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
Directory: dir.Path,
}
- glog.V(1).Infof("read directory: %v", request)
+ glog.V(4).Infof("read directory: %v", request)
resp, err := client.ListEntries(ctx, request)
if err != nil {
return err
@@ -104,6 +240,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
} else {
dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
ret = append(ret, dirent)
+ dir.wfs.listDirectoryEntriesCache.Set(dir.Path+"/"+entry.Name, entry, 300*time.Millisecond)
}
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
new file mode 100644
index 000000000..996eb0abb
--- /dev/null
+++ b/weed/filesys/dirty_page.go
@@ -0,0 +1,165 @@
+package filesys
+
+import (
+ "fmt"
+ "bytes"
+ "time"
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type ContinuousDirtyPages struct {
+ hasData bool
+ Offset int64
+ Size int64
+ Data []byte
+ f *File
+}
+
+func newDirtyPages(file *File) *ContinuousDirtyPages {
+ return &ContinuousDirtyPages{
+ Data: make([]byte, file.wfs.chunkSizeLimit),
+ f: file,
+ }
+}
+
+func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+
+ var chunk *filer_pb.FileChunk
+
+ if len(data) > len(pages.Data) {
+ // this is more than what buffer can hold.
+
+ // flush existing
+ if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ }
+ chunks = append(chunks, chunk)
+ } else {
+ glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return
+ }
+ pages.Size = 0
+
+ // flush the big page
+ if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ chunks = append(chunks, chunk)
+ }
+ } else {
+ glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return
+ }
+
+ return
+ }
+
+ if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
+ pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) {
+ // if the data is out of range,
+ // or buffer is full if adding new data,
+ // flush current buffer and add new data
+
+ // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size)
+
+ if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ chunks = append(chunks, chunk)
+ }
+ } else {
+ glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return
+ }
+ pages.Offset = offset
+ pages.Size = int64(len(data))
+ copy(pages.Data, data)
+ return
+ }
+
+ copy(pages.Data[offset-pages.Offset:], data)
+ pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
+
+ return
+}
+
+func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
+
+ if pages.Size == 0 {
+ return nil, nil
+ }
+
+ if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
+ pages.Size = 0
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ }
+ }
+ return
+}
+
+func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
+ return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
+}
+
+func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
+
+ if pages.Size == 0 {
+ return nil, nil
+ }
+
+ var fileId, host string
+
+ if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: pages.f.wfs.replication,
+ Collection: pages.f.wfs.collection,
+ }
+
+ resp, err := client.AssignVolume(ctx, request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+
+ fileId, host = resp.FileId, resp.Url
+
+ return nil
+ }); err != nil {
+ return nil, fmt.Errorf("filer assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ bufReader := bytes.NewReader(pages.Data[:pages.Size])
+ uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
+ return nil, fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
+ return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ return &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: offset,
+ Size: uint64(len(buf)),
+ Mtime: time.Now().UnixNano(),
+ }, nil
+
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index e4c79c055..1fb7d53b1 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -1,75 +1,136 @@
package filesys
import (
- "context"
- "fmt"
-
"bazil.org/fuse"
- "github.com/chrislusf/seaweedfs/weed/filer"
"bazil.org/fuse/fs"
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "os"
+ "path/filepath"
+ "time"
)
var _ = fs.Node(&File{})
-// var _ = fs.NodeOpener(&File{})
-// var _ = fs.NodeFsyncer(&File{})
-var _ = fs.Handle(&File{})
-var _ = fs.HandleReadAller(&File{})
-// var _ = fs.HandleReader(&File{})
-var _ = fs.HandleWriter(&File{})
+var _ = fs.NodeOpener(&File{})
+var _ = fs.NodeFsyncer(&File{})
+var _ = fs.NodeSetattrer(&File{})
type File struct {
- FileId filer.FileId
- Name string
- wfs *WFS
+ Chunks []*filer_pb.FileChunk
+ Name string
+ dir *Dir
+ wfs *WFS
+ attributes *filer_pb.FuseAttributes
+ isOpen bool
}
func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
- attr.Mode = 0444
- return file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.GetFileAttributesRequest{
- Name: file.Name,
- ParentDir: "", //TODO add parent folder
- FileId: string(file.FileId),
- }
+ fullPath := filepath.Join(file.dir.Path, file.Name)
+
+ if file.attributes == nil || !file.isOpen {
+ item := file.wfs.listDirectoryEntriesCache.Get(fullPath)
+ if item != nil {
+ entry := item.Value().(*filer_pb.Entry)
+ file.Chunks = entry.Chunks
+ file.attributes = entry.Attributes
+ glog.V(1).Infof("file attr read cached %v attributes", file.Name)
+ } else {
+ err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.GetEntryAttributesRequest{
+ Name: file.Name,
+ ParentDir: file.dir.Path,
+ }
- glog.V(1).Infof("read file size: %v", request)
- resp, err := client.GetFileAttributes(context, request)
- if err != nil {
- return err
+ resp, err := client.GetEntryAttributes(context, request)
+ if err != nil {
+ glog.V(0).Infof("file attr read file %v: %v", request, err)
+ return err
+ }
+
+ file.attributes = resp.Attributes
+ file.Chunks = resp.Chunks
+
+ glog.V(1).Infof("file attr %v %+v: %d", fullPath, file.attributes, filer2.TotalSize(file.Chunks))
+
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
}
+ }
+
+ attr.Mode = os.FileMode(file.attributes.FileMode)
+ attr.Size = filer2.TotalSize(file.Chunks)
+ attr.Mtime = time.Unix(file.attributes.Mtime, 0)
+ attr.Gid = file.attributes.Gid
+ attr.Uid = file.attributes.Uid
- attr.Size = resp.Attributes.FileSize
+ return nil
- return nil
- })
}
-func (file *File) ReadAll(ctx context.Context) (content []byte, err error) {
+func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
- err = file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ fullPath := filepath.Join(file.dir.Path, file.Name)
- request := &filer_pb.GetFileContentRequest{
- FileId: string(file.FileId),
- }
+ glog.V(3).Infof("%v file open %+v", fullPath, req)
+
+ file.isOpen = true
+
+ return &FileHandle{
+ f: file,
+ dirtyPages: newDirtyPages(file),
+ RequestId: req.Header.ID,
+ NodeId: req.Header.Node,
+ Uid: req.Uid,
+ Gid: req.Gid,
+ }, nil
+
+}
+
+func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
+ fullPath := filepath.Join(file.dir.Path, file.Name)
- glog.V(1).Infof("read file content: %v", request)
- resp, err := client.GetFileContent(ctx, request)
- if err != nil {
- return err
+ glog.V(3).Infof("%v file setattr %+v", fullPath, req)
+ if req.Valid.Size() {
+
+ glog.V(3).Infof("%v file setattr set size=%v", fullPath, req.Size)
+ if req.Size == 0 {
+ // fmt.Printf("truncate %v \n", fullPath)
+ file.Chunks = nil
}
+ file.attributes.FileSize = req.Size
+ }
+ if req.Valid.Mode() {
+ file.attributes.FileMode = uint32(req.Mode)
+ }
+
+ if req.Valid.Uid() {
+ file.attributes.Uid = req.Uid
+ }
- content = resp.Content
+ if req.Valid.Gid() {
+ file.attributes.Gid = req.Gid
+ }
- return nil
- })
+ if req.Valid.Mtime() {
+ file.attributes.Mtime = req.Mtime.Unix()
+ }
+
+ return nil
- return content, err
}
-func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
- fmt.Printf("write file %+v\n", req)
+func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
+ // fsync works at OS level
+ // write the file chunks to the filer
+ glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
+
return nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
new file mode 100644
index 000000000..bec240de2
--- /dev/null
+++ b/weed/filesys/filehandle.go
@@ -0,0 +1,219 @@
+package filesys
+
+import (
+ "bazil.org/fuse"
+ "bazil.org/fuse/fs"
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "strings"
+ "sync"
+ "net/http"
+)
+
+type FileHandle struct {
+ // cache file has been written to
+ dirtyPages *ContinuousDirtyPages
+ dirtyMetadata bool
+
+ f *File
+ RequestId fuse.RequestID // unique ID for request
+ NodeId fuse.NodeID // file or directory the request is about
+ Uid uint32 // user ID of process making request
+ Gid uint32 // group ID of process making request
+}
+
+var _ = fs.Handle(&FileHandle{})
+
+// var _ = fs.HandleReadAller(&FileHandle{})
+var _ = fs.HandleReader(&FileHandle{})
+var _ = fs.HandleFlusher(&FileHandle{})
+var _ = fs.HandleWriter(&FileHandle{})
+var _ = fs.HandleReleaser(&FileHandle{})
+
+func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
+
+ glog.V(4).Infof("%v/%v read fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(req.Size))
+
+ if len(fh.f.Chunks) == 0 {
+ glog.V(0).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name)
+ return fmt.Errorf("empty file %v/%v", fh.f.dir.Path, fh.f.Name)
+ }
+
+ buff := make([]byte, req.Size)
+
+ chunkViews := filer2.ViewFromChunks(fh.f.Chunks, req.Offset, req.Size)
+
+ var vids []string
+ for _, chunkView := range chunkViews {
+ vids = append(vids, volumeId(chunkView.FileId))
+ }
+
+ vid2Locations := make(map[string]*filer_pb.Locations)
+
+ err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: vids,
+ })
+ if err != nil {
+ return err
+ }
+
+ vid2Locations = resp.LocationsMap
+
+ return nil
+ })
+
+ if err != nil {
+ glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err)
+ return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
+ }
+
+ var totalRead int64
+ var wg sync.WaitGroup
+ for _, chunkView := range chunkViews {
+ wg.Add(1)
+ go func(chunkView *filer2.ChunkView) {
+ defer wg.Done()
+
+ glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
+
+ locations := vid2Locations[volumeId(chunkView.FileId)]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", chunkView.FileId)
+ err = fmt.Errorf("failed to locate %s", chunkView.FileId)
+ return
+ }
+
+ var n int64
+ n, err = util.ReadUrl(
+ fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
+ chunkView.Offset,
+ int(chunkView.Size),
+ buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)])
+
+ if err != nil {
+
+ glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err)
+
+ err = fmt.Errorf("failed to read http://%s/%s: %v",
+ locations.Locations[0].Url, chunkView.FileId, err)
+ return
+ }
+
+ glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
+ totalRead += n
+
+ }(chunkView)
+ }
+ wg.Wait()
+
+ resp.Data = buff[:totalRead]
+
+ return err
+}
+
+// Write to the file handle
+func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+
+ // write the request to volume servers
+
+ glog.V(4).Infof("%+v/%v write fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)))
+
+ chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
+ if err != nil {
+ return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
+ }
+
+ resp.Size = len(req.Data)
+
+ if req.Offset == 0 {
+ fh.f.attributes.Mime = http.DetectContentType(req.Data)
+ fh.dirtyMetadata = true
+ }
+
+ for _, chunk := range chunks {
+ fh.f.Chunks = append(fh.f.Chunks, chunk)
+ glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ fh.dirtyMetadata = true
+ }
+
+ return nil
+}
+
+func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
+
+ glog.V(4).Infof("%+v/%v release fh", fh.f.dir.Path, fh.f.Name)
+
+ fh.f.isOpen = false
+
+ return nil
+}
+
+// Flush - experimenting with uploading at flush, this slows operations down till it has been
+// completely flushed
+func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
+ // fflush works at fh level
+ // send the data to the OS
+ glog.V(4).Infof("%s/%s fh flush %v", fh.f.dir.Path, fh.f.Name, req)
+
+ chunk, err := fh.dirtyPages.FlushToStorage(ctx)
+ if err != nil {
+ glog.V(0).Infof("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ }
+ if chunk != nil {
+ fh.f.Chunks = append(fh.f.Chunks, chunk)
+ fh.dirtyMetadata = true
+ }
+
+ if !fh.dirtyMetadata {
+ return nil
+ }
+
+ if len(fh.f.Chunks) == 0 {
+ glog.V(2).Infof("fh %s/%s flush skipping empty: %v", fh.f.dir.Path, fh.f.Name, req)
+ return nil
+ }
+
+ err = fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: fh.f.dir.Path,
+ Entry: &filer_pb.Entry{
+ Name: fh.f.Name,
+ Attributes: fh.f.attributes,
+ Chunks: fh.f.Chunks,
+ },
+ }
+
+ glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.Chunks))
+ for i, chunk := range fh.f.Chunks {
+ glog.V(1).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ }
+ if _, err := client.UpdateEntry(ctx, request); err != nil {
+ return fmt.Errorf("update fh: %v", err)
+ }
+
+ return nil
+ })
+
+ if err == nil {
+ fh.dirtyMetadata = false
+ }
+
+ return err
+}
+
+func volumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index da50ae4a4..4b9e20b95 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -3,17 +3,26 @@ package filesys
import (
"bazil.org/fuse/fs"
"fmt"
- "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/karlseguin/ccache"
+ "google.golang.org/grpc"
)
type WFS struct {
- filer string
+ filer string
+ listDirectoryEntriesCache *ccache.Cache
+ collection string
+ replication string
+ chunkSizeLimit int64
}
-func NewSeaweedFileSystem(filer string) *WFS {
+func NewSeaweedFileSystem(filer string, collection string, replication string, chunkSizeLimitMB int) *WFS {
return &WFS{
- filer: filer,
+ filer: filer,
+ listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)),
+ collection: collection,
+ replication: replication,
+ chunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
}
}