aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/dirty_page.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/dirty_page.go')
-rw-r--r--weed/filesys/dirty_page.go30
1 files changed, 24 insertions, 6 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index f4e47950e..dd080f721 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "sync"
)
type ContinuousDirtyPages struct {
@@ -17,6 +18,7 @@ type ContinuousDirtyPages struct {
Size int64
Data []byte
f *File
+ lock sync.Mutex
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
@@ -28,6 +30,9 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+ pages.lock.Lock()
+ defer pages.lock.Unlock()
+
var chunk *filer_pb.FileChunk
if len(data) > len(pages.Data) {
@@ -37,13 +42,14 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
if chunk != nil {
glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ chunks = append(chunks, chunk)
}
- chunks = append(chunks, chunk)
} else {
glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
return
}
pages.Size = 0
+ pages.Offset = 0
// flush the big page
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
@@ -77,12 +83,18 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
return
}
pages.Offset = offset
- pages.Size = int64(len(data))
copy(pages.Data, data)
+ pages.Size = int64(len(data))
return
}
- copy(pages.Data[offset-pages.Offset:], data)
+ if offset != pages.Offset+pages.Size {
+ // when this happens, debug shows the data overlapping with existing data is empty
+ // the data is not just append
+ copy(pages.Data[pages.Size:], data[pages.Offset+pages.Size-offset:])
+ } else {
+ copy(pages.Data[offset-pages.Offset:], data)
+ }
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
return
@@ -90,12 +102,16 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
+ pages.lock.Lock()
+ defer pages.lock.Unlock()
+
if pages.Size == 0 {
return nil, nil
}
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
pages.Size = 0
+ pages.Offset = 0
if chunk != nil {
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
@@ -104,14 +120,16 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f
}
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
+
+ if pages.Size == 0 {
+ return nil, nil
+ }
+
return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
}
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
- if pages.Size == 0 {
- return nil, nil
- }
var fileId, host string