diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 159 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 165 | ||||
| -rw-r--r-- | weed/filesys/file.go | 145 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 219 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 17 |
5 files changed, 648 insertions, 57 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index c1c6afe9c..bf4eda936 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -7,11 +7,12 @@ import ( "path" "sync" - "bazil.org/fuse/fs" "bazil.org/fuse" - "github.com/chrislusf/seaweedfs/weed/filer" + "bazil.org/fuse/fs" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "path/filepath" + "time" ) type Dir struct { @@ -21,21 +22,156 @@ type Dir struct { wfs *WFS } +var _ = fs.Node(&Dir{}) +var _ = fs.NodeCreater(&Dir{}) +var _ = fs.NodeMkdirer(&Dir{}) +var _ = fs.NodeStringLookuper(&Dir{}) +var _ = fs.HandleReadDirAller(&Dir{}) +var _ = fs.NodeRemover(&Dir{}) + func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { - attr.Mode = os.ModeDir | 0777 + + if dir.Path == "/" { + attr.Valid = time.Second + attr.Mode = os.ModeDir | 0777 + return nil + } + + parent, name := filepath.Split(dir.Path) + + var attributes *filer_pb.FuseAttributes + + err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.GetEntryAttributesRequest{ + Name: name, + ParentDir: parent, + } + + glog.V(1).Infof("read dir attr: %v", request) + resp, err := client.GetEntryAttributes(context, request) + if err != nil { + glog.V(0).Infof("read dir attr %v: %v", request, err) + return err + } + + attributes = resp.Attributes + + return nil + }) + + if err != nil { + return err + } + + // glog.V(1).Infof("dir %s: %v", dir.Path, attributes) + // glog.V(1).Infof("dir %s permission: %v", dir.Path, os.FileMode(attributes.FileMode)) + + attr.Mode = os.FileMode(attributes.FileMode) | os.ModeDir + if dir.Path == "/" && attributes.FileMode == 0 { + attr.Valid = time.Second + } + + attr.Mtime = time.Unix(attributes.Mtime, 0) + attr.Ctime = time.Unix(attributes.Mtime, 0) + attr.Gid = attributes.Gid + attr.Uid = attributes.Uid + return nil } +func (dir *Dir) newFile(name string, chunks []*filer_pb.FileChunk) *File { + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + // attributes: &filer_pb.FuseAttributes{}, + Chunks: chunks, + } +} + +func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, + resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { + + err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.CreateEntryRequest{ + Directory: dir.Path, + Entry: &filer_pb.Entry{ + Name: req.Name, + IsDirectory: req.Mode&os.ModeDir > 0, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(req.Mode), + Uid: req.Uid, + Gid: req.Gid, + }, + }, + } + + glog.V(1).Infof("create: %v", request) + if _, err := client.CreateEntry(ctx, request); err != nil { + return fmt.Errorf("create file: %v", err) + } + + return nil + }) + + if err == nil { + file := dir.newFile(req.Name, nil) + dir.NodeMap[req.Name] = file + file.isOpen = true + return file, &FileHandle{ + f: file, + dirtyPages: newDirtyPages(file), + RequestId: req.Header.ID, + NodeId: req.Header.Node, + Uid: req.Uid, + Gid: req.Gid, + }, nil + } + + return nil, nil, err +} + func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { dir.NodeMapLock.Lock() defer dir.NodeMapLock.Unlock() - fmt.Printf("mkdir %+v\n", req) + err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.CreateEntryRequest{ + Directory: dir.Path, + Entry: &filer_pb.Entry{ + Name: req.Name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(req.Mode), + Uid: req.Uid, + Gid: req.Gid, + }, + }, + } + + glog.V(1).Infof("mkdir: %v", request) + if _, err := client.CreateEntry(ctx, request); err != nil { + glog.V(0).Infof("mkdir %v: %v", request, err) + return fmt.Errorf("make dir: %v", err) + } + + return nil + }) - node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs} - dir.NodeMap[req.Name] = node + if err == nil { + node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs} + dir.NodeMap[req.Name] = node + return node, nil + } - return node, nil + return nil, err } func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err error) { @@ -59,7 +195,7 @@ func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err erro Name: name, } - glog.V(1).Infof("lookup directory entry: %v", request) + glog.V(4).Infof("lookup directory entry: %v", request) resp, err := client.LookupDirectoryEntry(ctx, request) if err != nil { return err @@ -74,13 +210,13 @@ func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err erro if entry.IsDirectory { node = &Dir{Path: path.Join(dir.Path, name), wfs: dir.wfs} } else { - node = &File{FileId: filer.FileId(entry.FileId), Name: name, wfs: dir.wfs} + node = dir.newFile(name, entry.Chunks) } dir.NodeMap[name] = node return node, nil } - return nil, err + return nil, fuse.ENOENT } func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { @@ -91,7 +227,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { Directory: dir.Path, } - glog.V(1).Infof("read directory: %v", request) + glog.V(4).Infof("read directory: %v", request) resp, err := client.ListEntries(ctx, request) if err != nil { return err @@ -104,6 +240,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { } else { dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File} ret = append(ret, dirent) + dir.wfs.listDirectoryEntriesCache.Set(dir.Path+"/"+entry.Name, entry, 300*time.Millisecond) } } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go new file mode 100644 index 000000000..996eb0abb --- /dev/null +++ b/weed/filesys/dirty_page.go @@ -0,0 +1,165 @@ +package filesys + +import ( + "fmt" + "bytes" + "time" + "context" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type ContinuousDirtyPages struct { + hasData bool + Offset int64 + Size int64 + Data []byte + f *File +} + +func newDirtyPages(file *File) *ContinuousDirtyPages { + return &ContinuousDirtyPages{ + Data: make([]byte, file.wfs.chunkSizeLimit), + f: file, + } +} + +func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { + + var chunk *filer_pb.FileChunk + + if len(data) > len(pages.Data) { + // this is more than what buffer can hold. + + // flush existing + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + chunks = append(chunks, chunk) + } else { + glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + pages.Size = 0 + + // flush the big page + if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + + return + } + + if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || + pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) { + // if the data is out of range, + // or buffer is full if adding new data, + // flush current buffer and add new data + + // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size) + + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + pages.Offset = offset + pages.Size = int64(len(data)) + copy(pages.Data, data) + return + } + + copy(pages.Data[offset-pages.Offset:], data) + pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset) + + return +} + +func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) { + + if pages.Size == 0 { + return nil, nil + } + + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + pages.Size = 0 + if chunk != nil { + glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + } + return +} + +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { + return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) +} + +func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { + + if pages.Size == 0 { + return nil, nil + } + + var fileId, host string + + if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: pages.f.wfs.replication, + Collection: pages.f.wfs.collection, + } + + resp, err := client.AssignVolume(ctx, request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + + fileId, host = resp.FileId, resp.Url + + return nil + }); err != nil { + return nil, fmt.Errorf("filer assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + bufReader := bytes.NewReader(pages.Data[:pages.Size]) + uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "") + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) + return nil, fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) + return nil, fmt.Errorf("upload result: %v", uploadResult.Error) + } + + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: offset, + Size: uint64(len(buf)), + Mtime: time.Now().UnixNano(), + }, nil + +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} diff --git a/weed/filesys/file.go b/weed/filesys/file.go index e4c79c055..1fb7d53b1 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -1,75 +1,136 @@ package filesys import ( - "context" - "fmt" - "bazil.org/fuse" - "github.com/chrislusf/seaweedfs/weed/filer" "bazil.org/fuse/fs" + "context" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "os" + "path/filepath" + "time" ) var _ = fs.Node(&File{}) -// var _ = fs.NodeOpener(&File{}) -// var _ = fs.NodeFsyncer(&File{}) -var _ = fs.Handle(&File{}) -var _ = fs.HandleReadAller(&File{}) -// var _ = fs.HandleReader(&File{}) -var _ = fs.HandleWriter(&File{}) +var _ = fs.NodeOpener(&File{}) +var _ = fs.NodeFsyncer(&File{}) +var _ = fs.NodeSetattrer(&File{}) type File struct { - FileId filer.FileId - Name string - wfs *WFS + Chunks []*filer_pb.FileChunk + Name string + dir *Dir + wfs *WFS + attributes *filer_pb.FuseAttributes + isOpen bool } func (file *File) Attr(context context.Context, attr *fuse.Attr) error { - attr.Mode = 0444 - return file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.GetFileAttributesRequest{ - Name: file.Name, - ParentDir: "", //TODO add parent folder - FileId: string(file.FileId), - } + fullPath := filepath.Join(file.dir.Path, file.Name) + + if file.attributes == nil || !file.isOpen { + item := file.wfs.listDirectoryEntriesCache.Get(fullPath) + if item != nil { + entry := item.Value().(*filer_pb.Entry) + file.Chunks = entry.Chunks + file.attributes = entry.Attributes + glog.V(1).Infof("file attr read cached %v attributes", file.Name) + } else { + err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.GetEntryAttributesRequest{ + Name: file.Name, + ParentDir: file.dir.Path, + } - glog.V(1).Infof("read file size: %v", request) - resp, err := client.GetFileAttributes(context, request) - if err != nil { - return err + resp, err := client.GetEntryAttributes(context, request) + if err != nil { + glog.V(0).Infof("file attr read file %v: %v", request, err) + return err + } + + file.attributes = resp.Attributes + file.Chunks = resp.Chunks + + glog.V(1).Infof("file attr %v %+v: %d", fullPath, file.attributes, filer2.TotalSize(file.Chunks)) + + return nil + }) + + if err != nil { + return err + } } + } + + attr.Mode = os.FileMode(file.attributes.FileMode) + attr.Size = filer2.TotalSize(file.Chunks) + attr.Mtime = time.Unix(file.attributes.Mtime, 0) + attr.Gid = file.attributes.Gid + attr.Uid = file.attributes.Uid - attr.Size = resp.Attributes.FileSize + return nil - return nil - }) } -func (file *File) ReadAll(ctx context.Context) (content []byte, err error) { +func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { - err = file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + fullPath := filepath.Join(file.dir.Path, file.Name) - request := &filer_pb.GetFileContentRequest{ - FileId: string(file.FileId), - } + glog.V(3).Infof("%v file open %+v", fullPath, req) + + file.isOpen = true + + return &FileHandle{ + f: file, + dirtyPages: newDirtyPages(file), + RequestId: req.Header.ID, + NodeId: req.Header.Node, + Uid: req.Uid, + Gid: req.Gid, + }, nil + +} + +func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { + fullPath := filepath.Join(file.dir.Path, file.Name) - glog.V(1).Infof("read file content: %v", request) - resp, err := client.GetFileContent(ctx, request) - if err != nil { - return err + glog.V(3).Infof("%v file setattr %+v", fullPath, req) + if req.Valid.Size() { + + glog.V(3).Infof("%v file setattr set size=%v", fullPath, req.Size) + if req.Size == 0 { + // fmt.Printf("truncate %v \n", fullPath) + file.Chunks = nil } + file.attributes.FileSize = req.Size + } + if req.Valid.Mode() { + file.attributes.FileMode = uint32(req.Mode) + } + + if req.Valid.Uid() { + file.attributes.Uid = req.Uid + } - content = resp.Content + if req.Valid.Gid() { + file.attributes.Gid = req.Gid + } - return nil - }) + if req.Valid.Mtime() { + file.attributes.Mtime = req.Mtime.Unix() + } + + return nil - return content, err } -func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { - fmt.Printf("write file %+v\n", req) +func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { + // fsync works at OS level + // write the file chunks to the filer + glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req) + return nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go new file mode 100644 index 000000000..bec240de2 --- /dev/null +++ b/weed/filesys/filehandle.go @@ -0,0 +1,219 @@ +package filesys + +import ( + "bazil.org/fuse" + "bazil.org/fuse/fs" + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "strings" + "sync" + "net/http" +) + +type FileHandle struct { + // cache file has been written to + dirtyPages *ContinuousDirtyPages + dirtyMetadata bool + + f *File + RequestId fuse.RequestID // unique ID for request + NodeId fuse.NodeID // file or directory the request is about + Uid uint32 // user ID of process making request + Gid uint32 // group ID of process making request +} + +var _ = fs.Handle(&FileHandle{}) + +// var _ = fs.HandleReadAller(&FileHandle{}) +var _ = fs.HandleReader(&FileHandle{}) +var _ = fs.HandleFlusher(&FileHandle{}) +var _ = fs.HandleWriter(&FileHandle{}) +var _ = fs.HandleReleaser(&FileHandle{}) + +func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { + + glog.V(4).Infof("%v/%v read fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(req.Size)) + + if len(fh.f.Chunks) == 0 { + glog.V(0).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name) + return fmt.Errorf("empty file %v/%v", fh.f.dir.Path, fh.f.Name) + } + + buff := make([]byte, req.Size) + + chunkViews := filer2.ViewFromChunks(fh.f.Chunks, req.Offset, req.Size) + + var vids []string + for _, chunkView := range chunkViews { + vids = append(vids, volumeId(chunkView.FileId)) + } + + vid2Locations := make(map[string]*filer_pb.Locations) + + err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + glog.V(4).Infof("read fh lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return err + } + + vid2Locations = resp.LocationsMap + + return nil + }) + + if err != nil { + glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err) + return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) + } + + var totalRead int64 + var wg sync.WaitGroup + for _, chunkView := range chunkViews { + wg.Add(1) + go func(chunkView *filer2.ChunkView) { + defer wg.Done() + + glog.V(4).Infof("read fh reading chunk: %+v", chunkView) + + locations := vid2Locations[volumeId(chunkView.FileId)] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", chunkView.FileId) + err = fmt.Errorf("failed to locate %s", chunkView.FileId) + return + } + + var n int64 + n, err = util.ReadUrl( + fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), + chunkView.Offset, + int(chunkView.Size), + buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)]) + + if err != nil { + + glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err) + + err = fmt.Errorf("failed to read http://%s/%s: %v", + locations.Locations[0].Url, chunkView.FileId, err) + return + } + + glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) + totalRead += n + + }(chunkView) + } + wg.Wait() + + resp.Data = buff[:totalRead] + + return err +} + +// Write to the file handle +func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { + + // write the request to volume servers + + glog.V(4).Infof("%+v/%v write fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data))) + + chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) + if err != nil { + return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err) + } + + resp.Size = len(req.Data) + + if req.Offset == 0 { + fh.f.attributes.Mime = http.DetectContentType(req.Data) + fh.dirtyMetadata = true + } + + for _, chunk := range chunks { + fh.f.Chunks = append(fh.f.Chunks, chunk) + glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + fh.dirtyMetadata = true + } + + return nil +} + +func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { + + glog.V(4).Infof("%+v/%v release fh", fh.f.dir.Path, fh.f.Name) + + fh.f.isOpen = false + + return nil +} + +// Flush - experimenting with uploading at flush, this slows operations down till it has been +// completely flushed +func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { + // fflush works at fh level + // send the data to the OS + glog.V(4).Infof("%s/%s fh flush %v", fh.f.dir.Path, fh.f.Name, req) + + chunk, err := fh.dirtyPages.FlushToStorage(ctx) + if err != nil { + glog.V(0).Infof("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + } + if chunk != nil { + fh.f.Chunks = append(fh.f.Chunks, chunk) + fh.dirtyMetadata = true + } + + if !fh.dirtyMetadata { + return nil + } + + if len(fh.f.Chunks) == 0 { + glog.V(2).Infof("fh %s/%s flush skipping empty: %v", fh.f.dir.Path, fh.f.Name, req) + return nil + } + + err = fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.UpdateEntryRequest{ + Directory: fh.f.dir.Path, + Entry: &filer_pb.Entry{ + Name: fh.f.Name, + Attributes: fh.f.attributes, + Chunks: fh.f.Chunks, + }, + } + + glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.Chunks)) + for i, chunk := range fh.f.Chunks { + glog.V(1).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + if _, err := client.UpdateEntry(ctx, request); err != nil { + return fmt.Errorf("update fh: %v", err) + } + + return nil + }) + + if err == nil { + fh.dirtyMetadata = false + } + + return err +} + +func volumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index da50ae4a4..4b9e20b95 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -3,17 +3,26 @@ package filesys import ( "bazil.org/fuse/fs" "fmt" - "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/karlseguin/ccache" + "google.golang.org/grpc" ) type WFS struct { - filer string + filer string + listDirectoryEntriesCache *ccache.Cache + collection string + replication string + chunkSizeLimit int64 } -func NewSeaweedFileSystem(filer string) *WFS { +func NewSeaweedFileSystem(filer string, collection string, replication string, chunkSizeLimitMB int) *WFS { return &WFS{ - filer: filer, + filer: filer, + listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)), + collection: collection, + replication: replication, + chunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, } } |
