aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filesys/dirty_page.go64
1 files changed, 35 insertions, 29 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 3b55d5c0b..1dd7d3fde 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -37,32 +37,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
if len(data) > len(pages.Data) {
// this is more than what buffer can hold.
-
- // flush existing
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
- pages.Size = 0
- pages.Offset = 0
-
- // flush the big page
- if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
-
- return
+ return pages.flushAndSave(ctx, offset, data)
}
if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
@@ -91,15 +66,46 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
if offset != pages.Offset+pages.Size {
// when this happens, debug shows the data overlapping with existing data is empty
// the data is not just append
- copy(pages.Data[pages.Size:], data[pages.Offset+pages.Size-offset:])
- } else {
- copy(pages.Data[offset-pages.Offset:], data)
+ return pages.flushAndSave(ctx, offset, data)
}
+
+ copy(pages.Data[offset-pages.Offset:], data)
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
return
}
+func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+
+ var chunk *filer_pb.FileChunk
+
+ // flush existing
+ if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ chunks = append(chunks, chunk)
+ }
+ } else {
+ glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return
+ }
+ pages.Size = 0
+ pages.Offset = 0
+
+ // flush the new page
+ if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
+ if chunk != nil {
+ glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ chunks = append(chunks, chunk)
+ }
+ } else {
+ glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ return
+ }
+
+ return
+}
+
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
pages.lock.Lock()