aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/page_writer/upload_pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/page_writer/upload_pipeline.go')
-rw-r--r--weed/mount/page_writer/upload_pipeline.go40
1 files changed, 22 insertions, 18 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 252dddc06..6065f2f76 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "math"
"sync"
"sync/atomic"
)
@@ -55,7 +56,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
return t
}
-func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
+func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) {
+
up.chunksLock.Lock()
defer up.chunksLock.Unlock()
@@ -65,33 +67,39 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
if !found {
if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit
- fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
- for lci, mc := range up.writableChunks {
- chunkFullness := mc.WrittenSize()
- if fullness < chunkFullness {
- fullestChunkIndex = lci
- fullness = chunkFullness
+ laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
+ for wci, wc := range up.writableChunks {
+ activityScore := wc.ActivityScore()
+ if lowestActivityScore > activityScore {
+ laziestChunkIndex = wci
+ lowestActivityScore = activityScore
}
}
- up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
- // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
+ up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex)
+ // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
}
if isSequential &&
len(up.writableChunks) < up.writableChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
- pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
+ pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex)
}
up.writableChunks[logicChunkIndex] = pageChunk
}
- n = pageChunk.WriteDataAt(p, off)
+ //if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed {
+ // println("found already sealed chunk", logicChunkIndex)
+ //}
+ //if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading {
+ // println("found active read chunk", logicChunkIndex)
+ //}
+ n = pageChunk.WriteDataAt(p, off, tsNs)
up.maybeMoveToSealed(pageChunk, logicChunkIndex)
return
}
-func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
+func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
up.chunksLock.Lock()
@@ -103,12 +111,8 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
// read from sealed chunks first
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found {
- sealedChunk.referenceCounter++
- }
- if found {
- maxStop = sealedChunk.chunk.ReadDataAt(p, off)
+ maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
- sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
}
// read from writable chunks last
@@ -116,7 +120,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
if !found {
return
}
- writableMaxStop := writableChunk.ReadDataAt(p, off)
+ writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)