aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-26 00:36:19 -0800
committerchrislu <chris.lu@gmail.com>2022-01-26 00:36:19 -0800
commit62d815d1ca9e6c40b3485cae151932be57f2006d (patch)
tree7544355eed2f163cdbf3ecef0d0f3637afc45a98
parent9596fce562eda2291c3e488095abec0c30b40600 (diff)
downloadseaweedfs-62d815d1ca9e6c40b3485cae151932be57f2006d.tar.xz
seaweedfs-62d815d1ca9e6c40b3485cae151932be57f2006d.zip
use limited in memory buffer instead of swap file
-rw-r--r--weed/filesys/dirty_pages_chunked.go5
-rw-r--r--weed/filesys/page_writer/upload_pipeline.go10
-rw-r--r--weed/filesys/page_writer/upload_pipeline_test.go2
3 files changed, 6 insertions, 11 deletions
diff --git a/weed/filesys/dirty_pages_chunked.go b/weed/filesys/dirty_pages_chunked.go
index 71da91666..002922958 100644
--- a/weed/filesys/dirty_pages_chunked.go
+++ b/weed/filesys/dirty_pages_chunked.go
@@ -31,10 +31,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
fh: fh,
}
- swapFileDir := fh.f.wfs.option.getTempFilePageDir()
-
- dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(),
- fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, swapFileDir)
+ dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.f.wfs.option.ConcurrentWriters)
return dirtyPages
}
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go
index 65b41e9fa..53641e66d 100644
--- a/weed/filesys/page_writer/upload_pipeline.go
+++ b/weed/filesys/page_writer/upload_pipeline.go
@@ -24,7 +24,7 @@ type UploadPipeline struct {
saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
- swapFile *SwapFile
+ bufferChunkLimit int
}
type SealedChunk struct {
@@ -40,7 +40,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
}
}
-func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, swapFileDir string) *UploadPipeline {
+func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
return &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
@@ -48,9 +48,8 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx
uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
- filepath: filepath,
activeReadChunks: make(map[LogicChunkIndex]int),
- swapFile: NewSwapFile(swapFileDir, chunkSize),
+ bufferChunkLimit: bufferChunkLimit,
}
}
@@ -62,7 +61,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
memChunk, found := up.writableChunks[logicChunkIndex]
if !found {
- if len(up.writableChunks) < 16 {
+ if len(up.writableChunks) < up.bufferChunkLimit {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
@@ -180,5 +179,4 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
}
func (up *UploadPipeline) Shutdown() {
- up.swapFile.FreeResource()
}
diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go
index 5ecb677e8..816fb228b 100644
--- a/weed/filesys/page_writer/upload_pipeline_test.go
+++ b/weed/filesys/page_writer/upload_pipeline_test.go
@@ -7,7 +7,7 @@ import (
func TestUploadPipeline(t *testing.T) {
- uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil, "")
+ uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144)