aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/page_writer/upload_pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/page_writer/upload_pipeline.go')
-rw-r--r--weed/mount/page_writer/upload_pipeline.go23
1 files changed, 15 insertions, 8 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 3ed966de4..190076a2b 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -25,6 +25,7 @@ type UploadPipeline struct {
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
bufferChunkLimit int
+ swapFile *SwapFile
}
type SealedChunk struct {
@@ -40,7 +41,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
}
}
-func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
+func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline {
return &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
@@ -50,6 +51,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int),
bufferChunkLimit: bufferChunkLimit,
+ swapFile: NewSwapFile(swapFileDir, chunkSize),
}
}
@@ -59,10 +61,14 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
- memChunk, found := up.writableChunks[logicChunkIndex]
+ pageChunk, found := up.writableChunks[logicChunkIndex]
if !found {
- if len(up.writableChunks) < up.bufferChunkLimit {
- memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ if atomic.LoadInt64(&memChunkCounter) > 4*int64(up.bufferChunkLimit) {
+ // if total number of chunks is over 4 times of per file buffer count limit
+ pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
+ } else if len(up.writableChunks) < up.bufferChunkLimit {
+ // if current file chunks is still under the per file buffer count limit
+ pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
@@ -75,12 +81,12 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
delete(up.writableChunks, fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
- memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
}
- up.writableChunks[logicChunkIndex] = memChunk
+ up.writableChunks[logicChunkIndex] = pageChunk
}
- n = memChunk.WriteDataAt(p, off)
- up.maybeMoveToSealed(memChunk, logicChunkIndex)
+ n = pageChunk.WriteDataAt(p, off)
+ up.maybeMoveToSealed(pageChunk, logicChunkIndex)
return
}
@@ -179,6 +185,7 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
}
func (up *UploadPipeline) Shutdown() {
+ up.swapFile.FreeResource()
for logicChunkIndex, sealedChunk := range up.sealedChunks {
sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex))
}