diff options
| author | ustuzhanin <55892859+ustuzhanin@users.noreply.github.com> | 2020-10-02 22:47:25 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-02 22:47:25 +0500 |
| commit | 3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch) | |
| tree | e0b42e531d18136d9e272258187a305690ee2b4d /weed/filesys/dirty_page.go | |
| parent | cbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff) | |
| parent | 9ab98fa912814686b3035a97b5173c1628fbc0fc (diff) | |
| download | seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip | |
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/filesys/dirty_page.go')
| -rw-r--r-- | weed/filesys/dirty_page.go | 52 |
1 files changed, 25 insertions, 27 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 46d20e466..1ab7d0961 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -25,17 +25,11 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { } } -func (pages *ContinuousDirtyPages) releaseResource() { -} - var counter = int32(0) func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { - pages.lock.Lock() - defer pages.lock.Unlock() - - glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. @@ -85,14 +79,6 @@ func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chun return } -func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.saveExistingPagesToStorage() -} - func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { var hasSavedData bool @@ -106,7 +92,9 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer } if err == nil { - chunks = append(chunks, chunk) + if chunk != nil { + chunks = append(chunks, chunk) + } } else { return } @@ -121,14 +109,21 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi return nil, false, nil } + fileSize := int64(pages.f.entry.Attributes.FileSize) for { - chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + if chunkSize == 0 { + return + } + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) if err == nil { - hasSavedData = true - glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + if chunk != nil { + hasSavedData = true + } + glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize) return } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err) time.Sleep(5 * time.Second) } } @@ -139,6 +134,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, dir, _ := pages.f.fullpath().DirAndName() + reader = io.LimitReader(reader, size) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) if err != nil { return nil, err @@ -149,6 +145,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } +func maxUint64(x, y uint64) uint64 { + if x > y { + return x + } + return y +} + func max(x, y int64) int64 { if x > y { return x @@ -162,11 +165,6 @@ func min(x, y int64) int64 { return y } -func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.intervals.ReadData(data, startOffset) - +func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.intervals.ReadDataAt(data, startOffset) } |
