aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/dirty_page.go
diff options
context:
space:
mode:
authorustuzhanin <55892859+ustuzhanin@users.noreply.github.com>2020-10-02 22:47:25 +0500
committerGitHub <noreply@github.com>2020-10-02 22:47:25 +0500
commit3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch)
treee0b42e531d18136d9e272258187a305690ee2b4d /weed/filesys/dirty_page.go
parentcbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff)
parent9ab98fa912814686b3035a97b5173c1628fbc0fc (diff)
downloadseaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz
seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/filesys/dirty_page.go')
-rw-r--r--weed/filesys/dirty_page.go52
1 files changed, 25 insertions, 27 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 46d20e466..1ab7d0961 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -25,17 +25,11 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
}
}
-func (pages *ContinuousDirtyPages) releaseResource() {
-}
-
var counter = int32(0)
func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
- pages.lock.Lock()
- defer pages.lock.Unlock()
-
- glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
+ glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
@@ -85,14 +79,6 @@ func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chun
return
}
-func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) {
-
- pages.lock.Lock()
- defer pages.lock.Unlock()
-
- return pages.saveExistingPagesToStorage()
-}
-
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) {
var hasSavedData bool
@@ -106,7 +92,9 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer
}
if err == nil {
- chunks = append(chunks, chunk)
+ if chunk != nil {
+ chunks = append(chunks, chunk)
+ }
} else {
return
}
@@ -121,14 +109,21 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
return nil, false, nil
}
+ fileSize := int64(pages.f.entry.Attributes.FileSize)
for {
- chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
+ chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
+ if chunkSize == 0 {
+ return
+ }
+ chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
if err == nil {
- hasSavedData = true
- glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
+ if chunk != nil {
+ hasSavedData = true
+ }
+ glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize)
return
} else {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err)
time.Sleep(5 * time.Second)
}
}
@@ -139,6 +134,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
dir, _ := pages.f.fullpath().DirAndName()
+ reader = io.LimitReader(reader, size)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
if err != nil {
return nil, err
@@ -149,6 +145,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
}
+func maxUint64(x, y uint64) uint64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
func max(x, y int64) int64 {
if x > y {
return x
@@ -162,11 +165,6 @@ func min(x, y int64) int64 {
return y
}
-func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) {
-
- pages.lock.Lock()
- defer pages.lock.Unlock()
-
- return pages.intervals.ReadData(data, startOffset)
-
+func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ return pages.intervals.ReadDataAt(data, startOffset)
}