diff options
Diffstat (limited to 'weed/mount/page_writer/upload_pipeline.go')
| -rw-r--r-- | weed/mount/page_writer/upload_pipeline.go | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 252dddc06..6065f2f76 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" + "math" "sync" "sync/atomic" ) @@ -55,7 +56,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, return t } -func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) { +func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) { + up.chunksLock.Lock() defer up.chunksLock.Unlock() @@ -65,33 +67,39 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n if !found { if len(up.writableChunks) > up.writableChunkLimit { // if current file chunks is over the per file buffer count limit - fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) - for lci, mc := range up.writableChunks { - chunkFullness := mc.WrittenSize() - if fullness < chunkFullness { - fullestChunkIndex = lci - fullness = chunkFullness + laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64) + for wci, wc := range up.writableChunks { + activityScore := wc.ActivityScore() + if lowestActivityScore > activityScore { + laziestChunkIndex = wci + lowestActivityScore = activityScore } } - up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) - // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) + up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex) + // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs) } if isSequential && len(up.writableChunks) < up.writableChunkLimit && atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { - pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) + pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex) } up.writableChunks[logicChunkIndex] = pageChunk } - n = pageChunk.WriteDataAt(p, off) + //if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed { + // println("found already sealed chunk", logicChunkIndex) + //} + //if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading { + // println("found active read chunk", logicChunkIndex) + //} + n = pageChunk.WriteDataAt(p, off, tsNs) up.maybeMoveToSealed(pageChunk, logicChunkIndex) return } -func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { +func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) up.chunksLock.Lock() @@ -103,12 +111,8 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { // read from sealed chunks first sealedChunk, found := up.sealedChunks[logicChunkIndex] if found { - sealedChunk.referenceCounter++ - } - if found { - maxStop = sealedChunk.chunk.ReadDataAt(p, off) + maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs) glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) - sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) } // read from writable chunks last @@ -116,7 +120,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { if !found { return } - writableMaxStop := writableChunk.ReadDataAt(p, off) + writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs) glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) maxStop = max(maxStop, writableMaxStop) |
