diff options
| author | chrislu <chris.lu@gmail.com> | 2022-01-15 07:40:29 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-01-15 07:40:29 -0800 |
| commit | 1dc25218cdf59f26854c1610b51381269c372117 (patch) | |
| tree | fe830829908f61cc84d4db830d023b113a1ebcc5 | |
| parent | 2c95008a1a650420b0fc285b11d6d5ea8598c550 (diff) | |
| download | seaweedfs-1dc25218cdf59f26854c1610b51381269c372117.tar.xz seaweedfs-1dc25218cdf59f26854c1610b51381269c372117.zip | |
delay deleting from memory unless the metadata chunks is updated
| -rw-r--r-- | weed/filesys/dirty_pages_stream.go | 3 | ||||
| -rw-r--r-- | weed/filesys/page_writer/chunked_stream_writer.go | 7 | ||||
| -rw-r--r-- | weed/filesys/page_writer/chunked_stream_writer_test.go | 2 |
3 files changed, 5 insertions, 7 deletions
diff --git a/weed/filesys/dirty_pages_stream.go b/weed/filesys/dirty_pages_stream.go index 586b73698..4a0eacb39 100644 --- a/weed/filesys/dirty_pages_stream.go +++ b/weed/filesys/dirty_pages_stream.go @@ -52,7 +52,6 @@ func (pages *StreamDirtyPages) FlushData() error { if pages.lastErr != nil { return fmt.Errorf("flush data: %v", pages.lastErr) } - pages.chunkedStream.Reset() return nil } @@ -102,5 +101,5 @@ func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, } func (pages StreamDirtyPages) Destroy() { - pages.chunkedStream.Reset() + pages.chunkedStream.Destroy() } diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go index b4314e78f..be5f7ad2d 100644 --- a/weed/filesys/page_writer/chunked_stream_writer.go +++ b/weed/filesys/page_writer/chunked_stream_writer.go @@ -57,7 +57,6 @@ func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) { if memChunk.usage.IsComplete(cw.ChunkSize) { if cw.saveToStorageFn != nil { cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) } } @@ -92,7 +91,6 @@ func (cw *ChunkedStreamWriter) FlushAll() { for logicChunkIndex, memChunk := range cw.activeChunks { if cw.saveToStorageFn != nil { cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) } } } @@ -102,6 +100,7 @@ func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { + delete(cw.activeChunks, logicChunkIndex) atomic.AddInt32(&referenceCounter, -1) if atomic.LoadInt32(&referenceCounter) == 0 { mem.Free(memChunk.buf) @@ -110,8 +109,8 @@ func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex } } -// Reset releases used resources -func (cw *ChunkedStreamWriter) Reset() { +// Destroy releases used resources +func (cw *ChunkedStreamWriter) Destroy() { for t, memChunk := range cw.activeChunks { mem.Free(memChunk.buf) delete(cw.activeChunks, t) diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go index 364148c14..7ab8ceb87 100644 --- a/weed/filesys/page_writer/chunked_stream_writer_test.go +++ b/weed/filesys/page_writer/chunked_stream_writer_test.go @@ -8,7 +8,7 @@ import ( func TestWriteChunkedStream(t *testing.T) { x := NewChunkedStreamWriter(20) - defer x.Reset() + defer x.Destroy() y := NewChunkedFileWriter(os.TempDir(), 12) defer y.Destroy() |
