aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-15 07:40:29 -0800
committerchrislu <chris.lu@gmail.com>2022-01-15 07:40:29 -0800
commit1dc25218cdf59f26854c1610b51381269c372117 (patch)
treefe830829908f61cc84d4db830d023b113a1ebcc5
parent2c95008a1a650420b0fc285b11d6d5ea8598c550 (diff)
downloadseaweedfs-1dc25218cdf59f26854c1610b51381269c372117.tar.xz
seaweedfs-1dc25218cdf59f26854c1610b51381269c372117.zip
delay deleting from memory unless the metadata chunks is updated
-rw-r--r--weed/filesys/dirty_pages_stream.go3
-rw-r--r--weed/filesys/page_writer/chunked_stream_writer.go7
-rw-r--r--weed/filesys/page_writer/chunked_stream_writer_test.go2
3 files changed, 5 insertions, 7 deletions
diff --git a/weed/filesys/dirty_pages_stream.go b/weed/filesys/dirty_pages_stream.go
index 586b73698..4a0eacb39 100644
--- a/weed/filesys/dirty_pages_stream.go
+++ b/weed/filesys/dirty_pages_stream.go
@@ -52,7 +52,6 @@ func (pages *StreamDirtyPages) FlushData() error {
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
- pages.chunkedStream.Reset()
return nil
}
@@ -102,5 +101,5 @@ func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader,
}
func (pages StreamDirtyPages) Destroy() {
- pages.chunkedStream.Reset()
+ pages.chunkedStream.Destroy()
}
diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go
index b4314e78f..be5f7ad2d 100644
--- a/weed/filesys/page_writer/chunked_stream_writer.go
+++ b/weed/filesys/page_writer/chunked_stream_writer.go
@@ -57,7 +57,6 @@ func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) {
if memChunk.usage.IsComplete(cw.ChunkSize) {
if cw.saveToStorageFn != nil {
cw.saveOneChunk(memChunk, logicChunkIndex)
- delete(cw.activeChunks, logicChunkIndex)
}
}
@@ -92,7 +91,6 @@ func (cw *ChunkedStreamWriter) FlushAll() {
for logicChunkIndex, memChunk := range cw.activeChunks {
if cw.saveToStorageFn != nil {
cw.saveOneChunk(memChunk, logicChunkIndex)
- delete(cw.activeChunks, logicChunkIndex)
}
}
}
@@ -102,6 +100,7 @@ func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
+ delete(cw.activeChunks, logicChunkIndex)
atomic.AddInt32(&referenceCounter, -1)
if atomic.LoadInt32(&referenceCounter) == 0 {
mem.Free(memChunk.buf)
@@ -110,8 +109,8 @@ func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex
}
}
-// Reset releases used resources
-func (cw *ChunkedStreamWriter) Reset() {
+// Destroy releases used resources
+func (cw *ChunkedStreamWriter) Destroy() {
for t, memChunk := range cw.activeChunks {
mem.Free(memChunk.buf)
delete(cw.activeChunks, t)
diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go
index 364148c14..7ab8ceb87 100644
--- a/weed/filesys/page_writer/chunked_stream_writer_test.go
+++ b/weed/filesys/page_writer/chunked_stream_writer_test.go
@@ -8,7 +8,7 @@ import (
func TestWriteChunkedStream(t *testing.T) {
x := NewChunkedStreamWriter(20)
- defer x.Reset()
+ defer x.Destroy()
y := NewChunkedFileWriter(os.TempDir(), 12)
defer y.Destroy()