aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/page_writer/page_chunk.go1
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go7
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go6
-rw-r--r--weed/mount/page_writer/upload_pipeline.go26
4 files changed, 35 insertions, 5 deletions
diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go
index 32d246deb..ac1d24622 100644
--- a/weed/mount/page_writer/page_chunk.go
+++ b/weed/mount/page_writer/page_chunk.go
@@ -12,5 +12,6 @@ type PageChunk interface {
ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
IsComplete() bool
ActivityScore() int64
+ WrittenSize() int64
SaveContent(saveFn SaveToStorageFunc)
}
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
index 1ec8cecb4..cbd82c953 100644
--- a/weed/mount/page_writer/page_chunk_mem.go
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -86,6 +86,13 @@ func (mc *MemChunk) ActivityScore() int64 {
return mc.activityScore.ActivityScore()
}
+func (mc *MemChunk) WrittenSize() int64 {
+ mc.RLock()
+ defer mc.RUnlock()
+
+ return mc.usage.WrittenSize()
+}
+
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
mc.RLock()
defer mc.RUnlock()
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index 6cedc64df..10060bef9 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -161,6 +161,12 @@ func (sc *SwapFileChunk) ActivityScore() int64 {
return sc.activityScore.ActivityScore()
}
+func (sc *SwapFileChunk) WrittenSize() int64 {
+ sc.RLock()
+ defer sc.RUnlock()
+ return sc.usage.WrittenSize()
+}
+
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
sc.RLock()
defer sc.RUnlock()
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 6065f2f76..e1aa43fe2 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -4,7 +4,6 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
- "math"
"sync"
"sync/atomic"
)
@@ -67,15 +66,32 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN
if !found {
if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit
- laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
+ candidateChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
+ for lci, mc := range up.writableChunks {
+ chunkFullness := mc.WrittenSize()
+ if fullness < chunkFullness {
+ candidateChunkIndex = lci
+ fullness = chunkFullness
+ }
+ }
+ /* // this algo generates too many chunks
+ candidateChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
for wci, wc := range up.writableChunks {
activityScore := wc.ActivityScore()
- if lowestActivityScore > activityScore {
- laziestChunkIndex = wci
+ if lowestActivityScore >= activityScore {
+ if lowestActivityScore == activityScore {
+ chunkFullness := wc.WrittenSize()
+ if fullness < chunkFullness {
+ candidateChunkIndex = lci
+ fullness = chunkFullness
+ }
+ }
+ candidateChunkIndex = wci
lowestActivityScore = activityScore
}
}
- up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex)
+ */
+ up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
}
if isSequential &&