aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2023-01-12 23:07:36 -0800
committerGitHub <noreply@github.com>2023-01-12 23:07:36 -0800
commit1cd2e64aacc515beeddbe9fd64419740c04472f6 (patch)
tree6f5fc1d9eab970ab2cff40bbd316b6d2b1a33395
parent2452f3b2f74d8254190cce5291b68862d67e9100 (diff)
downloadseaweedfs-1cd2e64aacc515beeddbe9fd64419740c04472f6.tar.xz
seaweedfs-1cd2e64aacc515beeddbe9fd64419740c04472f6.zip
merge chunks during upload (#4130)
* merge chunks during upload * fix test
-rw-r--r--weed/filer/filechunk_section_test.go10
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go14
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go15
3 files changed, 31 insertions, 8 deletions
diff --git a/weed/filer/filechunk_section_test.go b/weed/filer/filechunk_section_test.go
index e4536540b..7b76c8456 100644
--- a/weed/filer/filechunk_section_test.go
+++ b/weed/filer/filechunk_section_test.go
@@ -7,31 +7,31 @@ import (
func Test_removeGarbageChunks(t *testing.T) {
section := NewFileChunkSection(0)
- section.addChunk(&filer_pb.FileChunk{
+ section.chunks = append(section.chunks, &filer_pb.FileChunk{
FileId: "0",
Offset: 0,
Size: 1,
ModifiedTsNs: 0,
})
- section.addChunk(&filer_pb.FileChunk{
+ section.chunks = append(section.chunks, &filer_pb.FileChunk{
FileId: "1",
Offset: 1,
Size: 1,
ModifiedTsNs: 1,
})
- section.addChunk(&filer_pb.FileChunk{
+ section.chunks = append(section.chunks, &filer_pb.FileChunk{
FileId: "2",
Offset: 2,
Size: 1,
ModifiedTsNs: 2,
})
- section.addChunk(&filer_pb.FileChunk{
+ section.chunks = append(section.chunks, &filer_pb.FileChunk{
FileId: "3",
Offset: 3,
Size: 1,
ModifiedTsNs: 3,
})
- section.addChunk(&filer_pb.FileChunk{
+ section.chunks = append(section.chunks, &filer_pb.FileChunk{
FileId: "4",
Offset: 4,
Size: 1,
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
index cbd82c953..3928f7077 100644
--- a/weed/mount/page_writer/page_chunk_mem.go
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -105,4 +105,18 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() {
})
}
+
+ for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
+ startOffset := t.StartOffset
+ stopOffset := t.stopOffset
+ tsNs := t.TsNs
+ for t != mc.usage.tail && t.next.StartOffset == stopOffset {
+ stopOffset = t.next.stopOffset
+ t = t.next
+ tsNs = max(tsNs, t.TsNs)
+ }
+ reader := util.NewBytesReader(mc.buf[startOffset:stopOffset])
+ saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+startOffset, stopOffset-startOffset, tsNs, func() {
+ })
+ }
}
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index 10060bef9..a1d9c04fa 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -176,11 +176,20 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
}
// println(sc.logicChunkIndex, "|", "save")
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
- data := mem.Allocate(int(t.Size()))
- n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
+ startOffset := t.StartOffset
+ stopOffset := t.stopOffset
+ tsNs := t.TsNs
+ for t != sc.usage.tail && t.next.StartOffset == stopOffset {
+ stopOffset = t.next.stopOffset
+ t = t.next
+ tsNs = max(tsNs, t.TsNs)
+ }
+
+ data := mem.Allocate(int(stopOffset - startOffset))
+ n, _ := sc.swapfile.file.ReadAt(data, startOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
if n > 0 {
reader := util.NewBytesReader(data[:n])
- saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() {
+ saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+startOffset, int64(n), tsNs, func() {
})
}
mem.Free(data)