diff options
Diffstat (limited to 'weed/mount/weedfs_file_sync.go')
| -rw-r--r-- | weed/mount/weedfs_file_sync.go | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go new file mode 100644 index 000000000..8fb7c73b4 --- /dev/null +++ b/weed/mount/weedfs_file_sync.go @@ -0,0 +1,178 @@ +package mount + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/hanwen/go-fuse/v2/fuse" + "time" +) + +/** + * Flush method + * + * This is called on each close() of the opened file. + * + * Since file descriptors can be duplicated (dup, dup2, fork), for + * one open call there may be many flush calls. + * + * Filesystems shouldn't assume that flush will always be called + * after some writes, or that if will be called at all. + * + * fi->fh will contain the value set by the open method, or will + * be undefined if the open method didn't set any value. + * + * NOTE: the name of the method is misleading, since (unlike + * fsync) the filesystem is not forced to flush pending writes. + * One reason to flush data is if the filesystem wants to return + * write errors during close. However, such use is non-portable + * because POSIX does not require [close] to wait for delayed I/O to + * complete. + * + * If the filesystem supports file locking operations (setlk, + * getlk) it should remove all locks belonging to 'fi->owner'. + * + * If this request is answered with an error code of ENOSYS, + * this is treated as success and future calls to flush() will + * succeed automatically without being send to the filesystem + * process. + * + * Valid replies: + * fuse_reply_err + * + * @param req request handle + * @param ino the inode number + * @param fi file information + * + * [close]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/close.html + */ +func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status { + fh := wfs.GetHandle(FileHandleId(in.Fh)) + if fh == nil { + return fuse.ENOENT + } + + fh.Lock() + defer fh.Unlock() + + return wfs.doFlush(fh, in.Uid, in.Gid) +} + +/** + * Synchronize file contents + * + * If the datasync parameter is non-zero, then only the user data + * should be flushed, not the meta data. + * + * If this request is answered with an error code of ENOSYS, + * this is treated as success and future calls to fsync() will + * succeed automatically without being send to the filesystem + * process. + * + * Valid replies: + * fuse_reply_err + * + * @param req request handle + * @param ino the inode number + * @param datasync flag indicating if only data should be flushed + * @param fi file information + */ +func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Status) { + + fh := wfs.GetHandle(FileHandleId(in.Fh)) + if fh == nil { + return fuse.ENOENT + } + + fh.Lock() + defer fh.Unlock() + + return wfs.doFlush(fh, in.Uid, in.Gid) + +} + +func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { + // flush works at fh level + fileFullPath := fh.FullPath() + dir, _ := fileFullPath.DirAndName() + // send the data to the OS + glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.handle) + + if err := fh.dirtyPages.FlushData(); err != nil { + glog.Errorf("%v doFlush: %v", fileFullPath, err) + return fuse.EIO + } + + if !fh.dirtyMetadata { + return fuse.OK + } + + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + + entry := fh.entry + if entry == nil { + return nil + } + + if entry.Attributes != nil { + entry.Attributes.Mime = fh.contentType + if entry.Attributes.Uid == 0 { + entry.Attributes.Uid = uid + } + if entry.Attributes.Gid == 0 { + entry.Attributes.Gid = gid + } + if entry.Attributes.Crtime == 0 { + entry.Attributes.Crtime = time.Now().Unix() + } + entry.Attributes.Mtime = time.Now().Unix() + entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions() + } + + request := &filer_pb.CreateEntryRequest{ + Directory: string(dir), + Entry: entry, + Signatures: []int32{wfs.signature}, + } + + glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.Chunks)) + for i, chunk := range entry.Chunks { + glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) + + chunks, _ := filer.CompactFileChunks(wfs.LookupFn(), nonManifestChunks) + chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", manifestErr) + } + entry.Chunks = append(chunks, manifestChunks...) + + wfs.mapPbIdFromLocalToFiler(request.Entry) + defer wfs.mapPbIdFromFilerToLocal(request.Entry) + + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.Errorf("fh flush create %s: %v", fileFullPath, err) + return fmt.Errorf("fh flush create %s: %v", fileFullPath, err) + } + + wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) + + return nil + }) + + if err == nil { + fh.dirtyMetadata = false + } + + if err != nil { + glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.handle, err) + return fuse.EIO + } + + return fuse.OK +} |
