diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2023-01-12 23:07:36 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-01-12 23:07:36 -0800 |
| commit | 1cd2e64aacc515beeddbe9fd64419740c04472f6 (patch) | |
| tree | 6f5fc1d9eab970ab2cff40bbd316b6d2b1a33395 | |
| parent | 2452f3b2f74d8254190cce5291b68862d67e9100 (diff) | |
| download | seaweedfs-1cd2e64aacc515beeddbe9fd64419740c04472f6.tar.xz seaweedfs-1cd2e64aacc515beeddbe9fd64419740c04472f6.zip | |
merge chunks during upload (#4130)
* merge chunks during upload
* fix test
| -rw-r--r-- | weed/filer/filechunk_section_test.go | 10 | ||||
| -rw-r--r-- | weed/mount/page_writer/page_chunk_mem.go | 14 | ||||
| -rw-r--r-- | weed/mount/page_writer/page_chunk_swapfile.go | 15 |
3 files changed, 31 insertions, 8 deletions
diff --git a/weed/filer/filechunk_section_test.go b/weed/filer/filechunk_section_test.go index e4536540b..7b76c8456 100644 --- a/weed/filer/filechunk_section_test.go +++ b/weed/filer/filechunk_section_test.go @@ -7,31 +7,31 @@ import ( func Test_removeGarbageChunks(t *testing.T) { section := NewFileChunkSection(0) - section.addChunk(&filer_pb.FileChunk{ + section.chunks = append(section.chunks, &filer_pb.FileChunk{ FileId: "0", Offset: 0, Size: 1, ModifiedTsNs: 0, }) - section.addChunk(&filer_pb.FileChunk{ + section.chunks = append(section.chunks, &filer_pb.FileChunk{ FileId: "1", Offset: 1, Size: 1, ModifiedTsNs: 1, }) - section.addChunk(&filer_pb.FileChunk{ + section.chunks = append(section.chunks, &filer_pb.FileChunk{ FileId: "2", Offset: 2, Size: 1, ModifiedTsNs: 2, }) - section.addChunk(&filer_pb.FileChunk{ + section.chunks = append(section.chunks, &filer_pb.FileChunk{ FileId: "3", Offset: 3, Size: 1, ModifiedTsNs: 3, }) - section.addChunk(&filer_pb.FileChunk{ + section.chunks = append(section.chunks, &filer_pb.FileChunk{ FileId: "4", Offset: 4, Size: 1, diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index cbd82c953..3928f7077 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -105,4 +105,18 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() { }) } + + for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { + startOffset := t.StartOffset + stopOffset := t.stopOffset + tsNs := t.TsNs + for t != mc.usage.tail && t.next.StartOffset == stopOffset { + stopOffset = t.next.stopOffset + t = t.next + tsNs = max(tsNs, t.TsNs) + } + reader := util.NewBytesReader(mc.buf[startOffset:stopOffset]) + saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+startOffset, stopOffset-startOffset, tsNs, func() { + }) + } } diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index 10060bef9..a1d9c04fa 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -176,11 +176,20 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { } // println(sc.logicChunkIndex, "|", "save") for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { - data := mem.Allocate(int(t.Size())) - n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) + startOffset := t.StartOffset + stopOffset := t.stopOffset + tsNs := t.TsNs + for t != sc.usage.tail && t.next.StartOffset == stopOffset { + stopOffset = t.next.stopOffset + t = t.next + tsNs = max(tsNs, t.TsNs) + } + + data := mem.Allocate(int(stopOffset - startOffset)) + n, _ := sc.swapfile.file.ReadAt(data, startOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) if n > 0 { reader := util.NewBytesReader(data[:n]) - saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() { + saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+startOffset, int64(n), tsNs, func() { }) } mem.Free(data) |
