diff options
Diffstat (limited to 'weed/filesys/dirty_page.go')
| -rw-r--r-- | weed/filesys/dirty_page.go | 52 |
1 files changed, 25 insertions, 27 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 46d20e466..1ab7d0961 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -25,17 +25,11 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { } } -func (pages *ContinuousDirtyPages) releaseResource() { -} - var counter = int32(0) func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { - pages.lock.Lock() - defer pages.lock.Unlock() - - glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. @@ -85,14 +79,6 @@ func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chun return } -func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.saveExistingPagesToStorage() -} - func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { var hasSavedData bool @@ -106,7 +92,9 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer } if err == nil { - chunks = append(chunks, chunk) + if chunk != nil { + chunks = append(chunks, chunk) + } } else { return } @@ -121,14 +109,21 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi return nil, false, nil } + fileSize := int64(pages.f.entry.Attributes.FileSize) for { - chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + if chunkSize == 0 { + return + } + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) if err == nil { - hasSavedData = true - glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + if chunk != nil { + hasSavedData = true + } + glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize) return } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err) time.Sleep(5 * time.Second) } } @@ -139,6 +134,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, dir, _ := pages.f.fullpath().DirAndName() + reader = io.LimitReader(reader, size) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) if err != nil { return nil, err @@ -149,6 +145,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } +func maxUint64(x, y uint64) uint64 { + if x > y { + return x + } + return y +} + func max(x, y int64) int64 { if x > y { return x @@ -162,11 +165,6 @@ func min(x, y int64) int64 { return y } -func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.intervals.ReadData(data, startOffset) - +func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.intervals.ReadDataAt(data, startOffset) } |
