aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-03-09 22:26:51 -0800
committerchrislu <chris.lu@gmail.com>2022-03-09 22:26:51 -0800
commit011a41b561ed3c8c496aead94f17c8f55d25429d (patch)
tree305a21de9bec7bae370cf0ecca8a56efe20f37e7
parentfc0a583a4d2ffdc0d44ac640620bb4e09b31f9f5 (diff)
downloadseaweedfs-011a41b561ed3c8c496aead94f17c8f55d25429d.tar.xz
seaweedfs-011a41b561ed3c8c496aead94f17c8f55d25429d.zip
add back writes to swap file when too many in memory chunks are used.
-rw-r--r--weed/mount/dirty_pages_chunked.go5
-rw-r--r--weed/mount/page_writer.go1
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go5
-rw-r--r--weed/mount/page_writer/upload_pipeline.go23
-rw-r--r--weed/mount/page_writer/upload_pipeline_test.go2
-rw-r--r--weed/mount/weedfs.go11
6 files changed, 34 insertions, 13 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
index ca44df227..a76c6dd61 100644
--- a/weed/mount/dirty_pages_chunked.go
+++ b/weed/mount/dirty_pages_chunked.go
@@ -30,7 +30,10 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
fh: fh,
}
- dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters)
+ swapFileDir := fh.wfs.option.getTempFilePageDir()
+
+ dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize,
+ dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters, swapFileDir)
return dirtyPages
}
diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go
index 9da892f00..ffcaa6398 100644
--- a/weed/mount/page_writer.go
+++ b/weed/mount/page_writer.go
@@ -23,7 +23,6 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
fh: fh,
chunkSize: chunkSize,
randomWriter: newMemoryChunkPages(fh, chunkSize),
- // randomWriter: newTempFileDirtyPages(fh.f, chunkSize),
}
return pw
}
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
index dfd54c19e..52db6d4f9 100644
--- a/weed/mount/page_writer/page_chunk_mem.go
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -3,10 +3,13 @@ package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
+ "sync/atomic"
)
var (
_ = PageChunk(&MemChunk{})
+
+ memChunkCounter int64
)
type MemChunk struct {
@@ -17,6 +20,7 @@ type MemChunk struct {
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
+ atomic.AddInt64(&memChunkCounter, 1)
return &MemChunk{
logicChunkIndex: logicChunkIndex,
chunkSize: chunkSize,
@@ -26,6 +30,7 @@ func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
}
func (mc *MemChunk) FreeResource() {
+ atomic.AddInt64(&memChunkCounter, -1)
mem.Free(mc.buf)
}
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 3ed966de4..190076a2b 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -25,6 +25,7 @@ type UploadPipeline struct {
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
bufferChunkLimit int
+ swapFile *SwapFile
}
type SealedChunk struct {
@@ -40,7 +41,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
}
}
-func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
+func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline {
return &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
@@ -50,6 +51,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int),
bufferChunkLimit: bufferChunkLimit,
+ swapFile: NewSwapFile(swapFileDir, chunkSize),
}
}
@@ -59,10 +61,14 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
- memChunk, found := up.writableChunks[logicChunkIndex]
+ pageChunk, found := up.writableChunks[logicChunkIndex]
if !found {
- if len(up.writableChunks) < up.bufferChunkLimit {
- memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ if atomic.LoadInt64(&memChunkCounter) > 4*int64(up.bufferChunkLimit) {
+ // if total number of chunks is over 4 times of per file buffer count limit
+ pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
+ } else if len(up.writableChunks) < up.bufferChunkLimit {
+ // if current file chunks is still under the per file buffer count limit
+ pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
@@ -75,12 +81,12 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
delete(up.writableChunks, fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
- memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
}
- up.writableChunks[logicChunkIndex] = memChunk
+ up.writableChunks[logicChunkIndex] = pageChunk
}
- n = memChunk.WriteDataAt(p, off)
- up.maybeMoveToSealed(memChunk, logicChunkIndex)
+ n = pageChunk.WriteDataAt(p, off)
+ up.maybeMoveToSealed(pageChunk, logicChunkIndex)
return
}
@@ -179,6 +185,7 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
}
func (up *UploadPipeline) Shutdown() {
+ up.swapFile.FreeResource()
for logicChunkIndex, sealedChunk := range up.sealedChunks {
sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex))
}
diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go
index 816fb228b..63b60faaf 100644
--- a/weed/mount/page_writer/upload_pipeline_test.go
+++ b/weed/mount/page_writer/upload_pipeline_test.go
@@ -7,7 +7,7 @@ import (
func TestUploadPipeline(t *testing.T) {
- uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
+ uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16, "")
writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144)
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 52eefab32..260a4e1da 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -16,6 +16,7 @@ import (
"math/rand"
"os"
"path"
+ "path/filepath"
"time"
"github.com/hanwen/go-fuse/v2/fs"
@@ -50,7 +51,8 @@ type Option struct {
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
- uniqueCacheDir string
+ uniqueCacheDir string
+ uniqueCacheTempPageDir string
}
type WFS struct {
@@ -177,7 +179,12 @@ func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
- os.MkdirAll(option.uniqueCacheDir, os.FileMode(0777)&^option.Umask)
+ option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap")
+ os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
+}
+
+func (option *Option) getTempFilePageDir() string {
+ return option.uniqueCacheTempPageDir
}
func (option *Option) getUniqueCacheDir() string {