aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorjenkins.ow <jenkins@outwardinc.com>2018-09-11 12:38:10 -0700
committerjenkins.ow <jenkins@outwardinc.com>2018-09-11 12:38:10 -0700
commit1690a080b2298ca8427d204994de68fff010e146 (patch)
tree28f811a3a8e6ce68fb882fc7f5859011086ebd46 /weed/filesys
parentbc025d53055066d20ee6cf02ff4d7a30527831fe (diff)
parent267201ff44d58d339ad2c9006ffe1d6d65e569b3 (diff)
downloadseaweedfs-1690a080b2298ca8427d204994de68fff010e146.tar.xz
seaweedfs-1690a080b2298ca8427d204994de68fff010e146.zip
Merge branch 'master' of https://github.com/hans-strudle/seaweedfs
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go93
-rw-r--r--weed/filesys/filehandle.go5
2 files changed, 66 insertions, 32 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index f4e47950e..145d89138 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "sync"
)
type ContinuousDirtyPages struct {
@@ -17,6 +18,7 @@ type ContinuousDirtyPages struct {
Size int64
Data []byte
f *File
+ lock sync.Mutex
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
@@ -28,35 +30,14 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+ pages.lock.Lock()
+ defer pages.lock.Unlock()
+
var chunk *filer_pb.FileChunk
if len(data) > len(pages.Data) {
// this is more than what buffer can hold.
-
- // flush existing
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
- chunks = append(chunks, chunk)
- } else {
- glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
- pages.Size = 0
-
- // flush the big page
- if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
-
- return
+ return pages.flushAndSave(ctx, offset, data)
}
if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
@@ -77,25 +58,74 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
return
}
pages.Offset = offset
- pages.Size = int64(len(data))
copy(pages.Data, data)
+ pages.Size = int64(len(data))
return
}
- copy(pages.Data[offset-pages.Offset:], data)
+ if offset != pages.Offset+pages.Size {
+ // when this happens, debug shows the data overlapping with existing data is empty
+ // the data is not just append
+ if offset == pages.Offset {
+ copy(pages.Data[pages.Size:], data[pages.Size:])
+ } else {
+ if pages.Size != 0 {
+ glog.V(0).Infof("possible error: pages [%d, %d) write [%d, %d)", pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data)))
+ }
+ return pages.flushAndSave(ctx, offset, data)
+ }
+ } else {
+ copy(pages.Data[offset-pages.Offset:], data)
+ }
+
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
return
}
+func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+
+ var chunk *filer_pb.FileChunk
+
+ // flush existing
+ if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ chunks = append(chunks, chunk)
+ }
+ } else {
+ glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return
+ }
+ pages.Size = 0
+ pages.Offset = 0
+
+ // flush the new page
+ if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ chunks = append(chunks, chunk)
+ }
+ } else {
+ glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return
+ }
+
+ return
+}
+
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
+ pages.lock.Lock()
+ defer pages.lock.Unlock()
+
if pages.Size == 0 {
return nil, nil
}
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
pages.Size = 0
+ pages.Offset = 0
if chunk != nil {
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
@@ -104,15 +134,16 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f
}
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
- return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
-}
-
-func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
if pages.Size == 0 {
return nil, nil
}
+ return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
+}
+
+func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
+
var fileId, host string
if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 0c13db984..786abbef2 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -140,6 +140,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
if err != nil {
+ glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err)
return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
}
@@ -179,7 +180,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
chunk, err := fh.dirtyPages.FlushToStorage(ctx)
if err != nil {
- glog.V(0).Infof("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ glog.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
}
if chunk != nil {
@@ -200,6 +201,8 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
if fh.f.attributes != nil {
fh.f.attributes.Mime = fh.contentType
+ fh.f.attributes.Uid = req.Uid
+ fh.f.attributes.Gid = req.Gid
}
request := &filer_pb.UpdateEntryRequest{