aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go152
-rw-r--r--weed/filesys/file.go11
-rw-r--r--weed/filesys/filehandle.go75
3 files changed, 182 insertions, 56 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
new file mode 100644
index 000000000..bfb73f3b0
--- /dev/null
+++ b/weed/filesys/dirty_page.go
@@ -0,0 +1,152 @@
+package filesys
+
+import (
+ "sync"
+ "sort"
+ "fmt"
+ "bytes"
+ "io"
+ "time"
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type DirtyPage struct {
+ Offset int64
+ Data []byte
+}
+
+type ContinuousDirtyPages struct {
+ sync.Mutex
+
+ pages []*DirtyPage
+ f *File
+}
+
+func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunk *filer_pb.FileChunk, err error) {
+ pages.Lock()
+ defer pages.Unlock()
+
+ isPerfectAppend := len(pages.pages) == 0
+ if len(pages.pages) > 0 {
+ lastPage := pages.pages[len(pages.pages)-1]
+ if lastPage.Offset+int64(len(lastPage.Data)) == offset {
+ // write continuous pages
+ glog.V(3).Infof("%s/%s append [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
+ isPerfectAppend = true
+ }
+ }
+
+ isPerfectReplace := false
+ for _, page := range pages.pages {
+ if page.Offset == offset && len(page.Data) == len(data) {
+ // perfect replace
+ glog.V(3).Infof("%s/%s replace [%d,%d)", pages.f.dir.Path, pages.f.Name, offset, offset+int64(len(data)))
+ page.Data = data
+ isPerfectReplace = true
+ }
+ }
+
+ if isPerfectReplace {
+ return nil, nil
+ }
+
+ if isPerfectAppend {
+ pages.pages = append(pages.pages, &DirtyPage{
+ Offset: offset,
+ Data: data,
+ })
+ return nil, nil
+ }
+
+ chunk, err = pages.saveToStorage(ctx)
+
+ glog.V(3).Infof("%s/%s saved [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+
+ pages.pages = []*DirtyPage{&DirtyPage{
+ Offset: offset,
+ Data: data,
+ }}
+
+ return
+}
+
+func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
+
+ pages.Lock()
+ defer pages.Unlock()
+
+ if chunk, err = pages.saveToStorage(ctx); err == nil {
+ pages.pages = nil
+ }
+ return
+}
+
+func (pages *ContinuousDirtyPages) totalSize() (total int64) {
+ for _, page := range pages.pages {
+ total += int64(len(page.Data))
+ }
+ return
+}
+
+func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
+
+ if len(pages.pages) == 0 {
+ return nil, nil
+ }
+
+ sort.Slice(pages.pages, func(i, j int) bool {
+ return pages.pages[i].Offset < pages.pages[j].Offset
+ })
+
+ var fileId, host string
+
+ if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: "000",
+ Collection: "",
+ }
+
+ resp, err := client.AssignVolume(ctx, request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+
+ fileId, host = resp.FileId, resp.Url
+
+ return nil
+ }); err != nil {
+ return nil, fmt.Errorf("filer assign volume: %v", err)
+ }
+
+ var readers []io.Reader
+ for _, page := range pages.pages {
+ readers = append(readers, bytes.NewReader(page.Data))
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ bufReader := io.MultiReader(readers...)
+ uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
+ return nil, fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
+ return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ return &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: pages.pages[0].Offset,
+ Size: uint64(pages.totalSize()),
+ Mtime: time.Now().UnixNano(),
+ }, nil
+
+}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 7ea14cc49..255fe4af0 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -84,11 +84,12 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
file.isOpen = true
return &FileHandle{
- f: file,
- RequestId: req.Header.ID,
- NodeId: req.Header.Node,
- Uid: req.Uid,
- Gid: req.Gid,
+ f: file,
+ dirtyPages: &ContinuousDirtyPages{f: file},
+ RequestId: req.Header.ID,
+ NodeId: req.Header.Node,
+ Uid: req.Uid,
+ Gid: req.Gid,
}, nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index d3c8ec796..52df2824c 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -3,22 +3,20 @@ package filesys
import (
"bazil.org/fuse"
"bazil.org/fuse/fs"
- "bytes"
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
"sync"
- "time"
)
type FileHandle struct {
// cache file has been written to
- dirty bool
+ dirtyPages *ContinuousDirtyPages
+ dirtyMetadata bool
cachePath string
@@ -128,55 +126,20 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
// write the request to volume servers
- glog.V(3).Infof("%+v/%v write fh: %+v", fh.f.dir.Path, fh.f.Name, req)
+ glog.V(3).Infof("%+v/%v write fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)))
- var fileId, host string
-
- if err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: "000",
- Collection: "",
- }
-
- resp, err := client.AssignVolume(ctx, request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
-
- fileId, host = resp.FileId, resp.Url
-
- return nil
- }); err != nil {
- return fmt.Errorf("filer assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- bufReader := bytes.NewReader(req.Data)
- uploadResult, err := operation.Upload(fileUrl, fh.f.Name, bufReader, false, "application/octet-stream", nil, "")
+ chunk, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", req, fileUrl, err)
- return fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", req, fileUrl, err)
- return fmt.Errorf("upload result: %v", uploadResult.Error)
+ return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
}
- resp.Size = int(uploadResult.Size)
-
- fh.f.Chunks = append(fh.f.Chunks, &filer_pb.FileChunk{
- FileId: fileId,
- Offset: req.Offset,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- })
-
- glog.V(1).Infof("uploaded %s/%s to: %v, [%d,%d)", fh.f.dir.Path, fh.f.Name, fileUrl, req.Offset, req.Offset+int64(resp.Size))
+ resp.Size = len(req.Data)
- fh.dirty = true
+ if chunk != nil {
+ fh.f.Chunks = append(fh.f.Chunks, chunk)
+ glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ fh.dirtyMetadata = true
+ }
return nil
}
@@ -197,7 +160,17 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
// send the data to the OS
glog.V(3).Infof("%s/%s fh flush %v", fh.f.dir.Path, fh.f.Name, req)
- if !fh.dirty {
+ chunk, err := fh.dirtyPages.FlushToStorage(ctx)
+ if err != nil {
+ glog.V(0).Infof("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ }
+ if chunk != nil {
+ fh.f.Chunks = append(fh.f.Chunks, chunk)
+ fh.dirtyMetadata = true
+ }
+
+ if !fh.dirtyMetadata {
return nil
}
@@ -206,7 +179,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return nil
}
- err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: fh.f.dir.Path,
@@ -229,7 +202,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
})
if err == nil {
- fh.dirty = false
+ fh.dirtyMetadata = false
}
return err