diff options
| author | jenkins.ow <jenkins@outwardinc.com> | 2018-09-11 12:38:10 -0700 |
|---|---|---|
| committer | jenkins.ow <jenkins@outwardinc.com> | 2018-09-11 12:38:10 -0700 |
| commit | 1690a080b2298ca8427d204994de68fff010e146 (patch) | |
| tree | 28f811a3a8e6ce68fb882fc7f5859011086ebd46 /weed/filesys | |
| parent | bc025d53055066d20ee6cf02ff4d7a30527831fe (diff) | |
| parent | 267201ff44d58d339ad2c9006ffe1d6d65e569b3 (diff) | |
| download | seaweedfs-1690a080b2298ca8427d204994de68fff010e146.tar.xz seaweedfs-1690a080b2298ca8427d204994de68fff010e146.zip | |
Merge branch 'master' of https://github.com/hans-strudle/seaweedfs
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dirty_page.go | 93 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 5 |
2 files changed, 66 insertions, 32 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index f4e47950e..145d89138 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "sync" ) type ContinuousDirtyPages struct { @@ -17,6 +18,7 @@ type ContinuousDirtyPages struct { Size int64 Data []byte f *File + lock sync.Mutex } func newDirtyPages(file *File) *ContinuousDirtyPages { @@ -28,35 +30,14 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { + pages.lock.Lock() + defer pages.lock.Unlock() + var chunk *filer_pb.FileChunk if len(data) > len(pages.Data) { // this is more than what buffer can hold. - - // flush existing - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) - } - chunks = append(chunks, chunk) - } else { - glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return - } - pages.Size = 0 - - // flush the big page - if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) - chunks = append(chunks, chunk) - } - } else { - glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return - } - - return + return pages.flushAndSave(ctx, offset, data) } if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || @@ -77,25 +58,74 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da return } pages.Offset = offset - pages.Size = int64(len(data)) copy(pages.Data, data) + pages.Size = int64(len(data)) return } - copy(pages.Data[offset-pages.Offset:], data) + if offset != pages.Offset+pages.Size { + // when this happens, debug shows the data overlapping with existing data is empty + // the data is not just append + if offset == pages.Offset { + copy(pages.Data[pages.Size:], data[pages.Size:]) + } else { + if pages.Size != 0 { + glog.V(0).Infof("possible error: pages [%d, %d) write [%d, %d)", pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data))) + } + return pages.flushAndSave(ctx, offset, data) + } + } else { + copy(pages.Data[offset-pages.Offset:], data) + } + pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset) return } +func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { + + var chunk *filer_pb.FileChunk + + // flush existing + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + pages.Size = 0 + pages.Offset = 0 + + // flush the new page + if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + + return +} + func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) { + pages.lock.Lock() + defer pages.lock.Unlock() + if pages.Size == 0 { return nil, nil } if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { pages.Size = 0 + pages.Offset = 0 if chunk != nil { glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) } @@ -104,15 +134,16 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f } func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { - return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) -} - -func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { if pages.Size == 0 { return nil, nil } + return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) +} + +func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { + var fileId, host string if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 0c13db984..786abbef2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -140,6 +140,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) if err != nil { + glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err) } @@ -179,7 +180,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { chunk, err := fh.dirtyPages.FlushToStorage(ctx) if err != nil { - glog.V(0).Infof("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + glog.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) } if chunk != nil { @@ -200,6 +201,8 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { if fh.f.attributes != nil { fh.f.attributes.Mime = fh.contentType + fh.f.attributes.Uid = req.Uid + fh.f.attributes.Gid = req.Gid } request := &filer_pb.UpdateEntryRequest{ |
