aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-13 23:27:11 -0800
committerchrislu <chris.lu@gmail.com>2022-02-13 23:27:11 -0800
commit7286e525ad85dec877d506908a0ff35590b0f357 (patch)
treef2735df7a9c73795265ea95c5bff73b36e36d21c
parent2b955c171345334a4034888c69547662150ceb91 (diff)
downloadseaweedfs-7286e525ad85dec877d506908a0ff35590b0f357.tar.xz
seaweedfs-7286e525ad85dec877d506908a0ff35590b0f357.zip
support write
-rw-r--r--weed/mount/filehandle.go1
-rw-r--r--weed/mount/weedfs_file_sync.go114
-rw-r--r--weed/mount/weedfs_file_write.go33
3 files changed, 145 insertions, 3 deletions
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 0d5481b30..f2a2ec69c 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -21,6 +21,7 @@ type FileHandle struct {
wfs *WFS
// cache file has been written to
+ dirtyMetadata bool
dirtyPages *PageWriter
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 1b89c1ecb..29a13690b 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -1,7 +1,14 @@
package mount
import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
+ "os"
+ "time"
)
/**
@@ -43,7 +50,15 @@ import (
* [close]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/close.html
*/
func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status {
- return fuse.ENOSYS
+ fh := wfs.GetHandle(FileHandleId(in.Fh))
+ if fh == nil {
+ return fuse.ENOENT
+ }
+
+ fh.Lock()
+ defer fh.Unlock()
+
+ return wfs.doFlush(fh, in.Uid, in.Gid)
}
/**
@@ -66,5 +81,100 @@ func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status {
* @param fi file information
*/
func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Status) {
- return fuse.ENOSYS
+
+ fh := wfs.GetHandle(FileHandleId(in.Fh))
+ if fh == nil {
+ return fuse.ENOENT
+ }
+
+ fh.Lock()
+ defer fh.Unlock()
+
+ return wfs.doFlush(fh, in.Uid, in.Gid)
+
+}
+
+func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
+ // flush works at fh level
+ fileFullPath := fh.FullPath()
+ dir, _ := fileFullPath.DirAndName()
+ // send the data to the OS
+ glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.handle)
+
+ if err := fh.dirtyPages.FlushData(); err != nil {
+ glog.Errorf("%v doFlush: %v", fileFullPath, err)
+ return fuse.EIO
+ }
+
+ if !fh.dirtyMetadata {
+ return fuse.OK
+ }
+
+ err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+
+ entry := fh.entry
+ if entry == nil {
+ return nil
+ }
+
+ if entry.Attributes != nil {
+ entry.Attributes.Mime = fh.contentType
+ if entry.Attributes.Uid == 0 {
+ entry.Attributes.Uid = uid
+ }
+ if entry.Attributes.Gid == 0 {
+ entry.Attributes.Gid = gid
+ }
+ if entry.Attributes.Crtime == 0 {
+ entry.Attributes.Crtime = time.Now().Unix()
+ }
+ entry.Attributes.Mtime = time.Now().Unix()
+ entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ wfs.option.Umask)
+ entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions()
+ }
+
+ request := &filer_pb.CreateEntryRequest{
+ Directory: string(dir),
+ Entry: entry,
+ Signatures: []int32{wfs.signature},
+ }
+
+ glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.Chunks))
+ for i, chunk := range entry.Chunks {
+ glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
+ }
+
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
+
+ chunks, _ := filer.CompactFileChunks(wfs.LookupFn(), nonManifestChunks)
+ chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
+ }
+ entry.Chunks = append(chunks, manifestChunks...)
+
+ wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer wfs.mapPbIdFromFilerToLocal(request.Entry)
+
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.Errorf("fh flush create %s: %v", fileFullPath, err)
+ return fmt.Errorf("fh flush create %s: %v", fileFullPath, err)
+ }
+
+ wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+
+ return nil
+ })
+
+ if err == nil {
+ fh.dirtyMetadata = false
+ }
+
+ if err != nil {
+ glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.handle, err)
+ return fuse.EIO
+ }
+
+ return fuse.OK
}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index 72152d72e..efdf39386 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -2,6 +2,7 @@ package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
+ "net/http"
)
/**
@@ -31,5 +32,35 @@ import (
* @param fi file information
*/
func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (written uint32, code fuse.Status) {
- return 0, fuse.ENOSYS
+
+ fh := wfs.GetHandle(FileHandleId(in.Fh))
+ if fh == nil {
+ return 0, fuse.ENOENT
+ }
+
+ fh.Lock()
+ defer fh.Unlock()
+
+ entry := fh.entry
+ if entry == nil {
+ return 0, fuse.OK
+ }
+
+ entry.Content = nil
+ offset := int64(in.Offset)
+ entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
+ // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
+
+ fh.dirtyPages.AddPage(offset, data)
+
+ written = uint32(len(data))
+
+ if offset == 0 {
+ // detect mime type
+ fh.contentType = http.DetectContentType(data)
+ }
+
+ fh.dirtyMetadata = true
+
+ return written, fuse.OK
}