aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-23 23:02:05 -0800
committerchrislu <chris.lu@gmail.com>2022-01-23 23:02:05 -0800
commit3bba2124ef346e90f6f2b5378ad6ce63c7f2226e (patch)
tree6e7660d70240685f4f82149f526e0efef370011e
parent520591e6ea825fcf33bb3dcfe40d0d1daf312389 (diff)
downloadseaweedfs-3bba2124ef346e90f6f2b5378ad6ce63c7f2226e.tar.xz
seaweedfs-3bba2124ef346e90f6f2b5378ad6ce63c7f2226e.zip
use a sliding window of in-memory writable chunks
-rw-r--r--weed/filesys/page_writer/chunk_interval_list.go6
-rw-r--r--weed/filesys/page_writer/page_chunk.go1
-rw-r--r--weed/filesys/page_writer/page_chunk_mem.go4
-rw-r--r--weed/filesys/page_writer/page_chunk_swapfile.go4
-rw-r--r--weed/filesys/page_writer/upload_pipeline.go14
5 files changed, 26 insertions, 3 deletions
diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go
index dca9a1740..e6dc5d1f5 100644
--- a/weed/filesys/page_writer/chunk_interval_list.go
+++ b/weed/filesys/page_writer/chunk_interval_list.go
@@ -51,6 +51,12 @@ func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64)
func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
return list.size() == 1 && list.head.next.isComplete(chunkSize)
}
+func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
+ for t := list.head; t != nil; t = t.next {
+ writtenByteCount += t.Size()
+ }
+ return
+}
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go
index d1f3a5745..4e8f31425 100644
--- a/weed/filesys/page_writer/page_chunk.go
+++ b/weed/filesys/page_writer/page_chunk.go
@@ -11,5 +11,6 @@ type PageChunk interface {
WriteDataAt(src []byte, offset int64) (n int)
ReadDataAt(p []byte, off int64) (maxStop int64)
IsComplete() bool
+ WrittenSize() int64
SaveContent(saveFn SaveToStorageFunc)
}
diff --git a/weed/filesys/page_writer/page_chunk_mem.go b/weed/filesys/page_writer/page_chunk_mem.go
index 887993eea..dfd54c19e 100644
--- a/weed/filesys/page_writer/page_chunk_mem.go
+++ b/weed/filesys/page_writer/page_chunk_mem.go
@@ -53,6 +53,10 @@ func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize)
}
+func (mc *MemChunk) WrittenSize() int64 {
+ return mc.usage.WrittenSize()
+}
+
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
diff --git a/weed/filesys/page_writer/page_chunk_swapfile.go b/weed/filesys/page_writer/page_chunk_swapfile.go
index 44ad64297..486557629 100644
--- a/weed/filesys/page_writer/page_chunk_swapfile.go
+++ b/weed/filesys/page_writer/page_chunk_swapfile.go
@@ -101,6 +101,10 @@ func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
+func (sc *SwapFileChunk) WrittenSize() int64 {
+ return sc.usage.WrittenSize()
+}
+
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go
index 47693c235..65b41e9fa 100644
--- a/weed/filesys/page_writer/upload_pipeline.go
+++ b/weed/filesys/page_writer/upload_pipeline.go
@@ -65,10 +65,18 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
if len(up.writableChunks) < 16 {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
- memChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
- if memChunk == nil {
- memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
+ for lci, mc := range up.writableChunks {
+ chunkFullness := mc.WrittenSize()
+ if fullness < chunkFullness {
+ fullestChunkIndex = lci
+ fullness = chunkFullness
+ }
}
+ up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
+ delete(up.writableChunks, fullestChunkIndex)
+ fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
+ memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
}
up.writableChunks[logicChunkIndex] = memChunk
}