aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-15 06:41:42 -0800
committerchrislu <chris.lu@gmail.com>2022-01-15 06:41:42 -0800
commit8f9d1c1e3c3a5b1f51a1da4985a138faa0d2b818 (patch)
tree73f77114c5ae6d5a644e47a8aa368b3feb131ade
parent1bd6d289d48ea3ae3c1461bf090ce0ffa6f2505f (diff)
downloadseaweedfs-8f9d1c1e3c3a5b1f51a1da4985a138faa0d2b818.tar.xz
seaweedfs-8f9d1c1e3c3a5b1f51a1da4985a138faa0d2b818.zip
upload only not flushed chunks
-rw-r--r--weed/filesys/dirty_pages_temp_file.go4
-rw-r--r--weed/filesys/page_writer/chunk_interval_list.go8
-rw-r--r--weed/filesys/page_writer/chunked_file_writer.go8
-rw-r--r--weed/filesys/page_writer/chunked_file_writer_test.go4
-rw-r--r--weed/filesys/page_writer/chunked_stream_writer_test.go2
5 files changed, 18 insertions, 8 deletions
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index e0c3a91de..4297ec62d 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -51,7 +51,6 @@ func (pages *TempFileDirtyPages) FlushData() error {
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
- pages.chunkedFile.Reset()
return nil
}
@@ -68,6 +67,7 @@ func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) {
reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size())
+ interval.MarkFlushed()
})
}
@@ -102,5 +102,5 @@ func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reade
}
func (pages TempFileDirtyPages) Destroy() {
- pages.chunkedFile.Reset()
+ pages.chunkedFile.Destroy()
}
diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go
index dca9a1740..838b1cdfe 100644
--- a/weed/filesys/page_writer/chunk_interval_list.go
+++ b/weed/filesys/page_writer/chunk_interval_list.go
@@ -6,6 +6,7 @@ import "math"
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
+ flushed bool
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
@@ -18,6 +19,10 @@ func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
return interval.stopOffset-interval.StartOffset == chunkSize
}
+func (interval *ChunkWrittenInterval) MarkFlushed() {
+ interval.flushed = true
+}
+
// ChunkWrittenIntervalList mark written intervals within one page chunk
type ChunkWrittenIntervalList struct {
head *ChunkWrittenInterval
@@ -64,18 +69,21 @@ func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval
if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together
p.stopOffset = q.stopOffset
+ p.flushed = false
unlinkNodesBetween(p, q.next)
return
}
if interval.StartOffset <= p.stopOffset {
// merge new interval into p
p.stopOffset = interval.stopOffset
+ p.flushed = false
unlinkNodesBetween(p, q)
return
}
if q.StartOffset <= interval.stopOffset {
// merge new interval into q
q.StartOffset = interval.StartOffset
+ q.flushed = false
unlinkNodesBetween(p, q)
return
}
diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go
index b0e1c2844..d2f94dafd 100644
--- a/weed/filesys/page_writer/chunked_file_writer.go
+++ b/weed/filesys/page_writer/chunked_file_writer.go
@@ -106,13 +106,15 @@ func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, log
for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
chunkUsage := cw.chunkUsages[actualChunkIndex]
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
- process(cw.file, logicChunkIndex, t)
+ if !t.flushed {
+ process(cw.file, logicChunkIndex, t)
+ }
}
}
}
-// Reset releases used resources
-func (cw *ChunkedFileWriter) Reset() {
+// Destroy releases used resources
+func (cw *ChunkedFileWriter) Destroy() {
if cw.file != nil {
cw.file.Close()
os.Remove(cw.file.Name())
diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go
index 244ed62c3..1c72c77d4 100644
--- a/weed/filesys/page_writer/chunked_file_writer_test.go
+++ b/weed/filesys/page_writer/chunked_file_writer_test.go
@@ -35,9 +35,9 @@ func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
func TestWriteChunkedFile(t *testing.T) {
x := NewChunkedFileWriter(os.TempDir(), 20)
- defer x.Reset()
+ defer x.Destroy()
y := NewChunkedFileWriter(os.TempDir(), 12)
- defer y.Reset()
+ defer y.Destroy()
batchSize := 4
buf := make([]byte, batchSize)
diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go
index 3c55a91ad..364148c14 100644
--- a/weed/filesys/page_writer/chunked_stream_writer_test.go
+++ b/weed/filesys/page_writer/chunked_stream_writer_test.go
@@ -10,7 +10,7 @@ func TestWriteChunkedStream(t *testing.T) {
x := NewChunkedStreamWriter(20)
defer x.Reset()
y := NewChunkedFileWriter(os.TempDir(), 12)
- defer y.Reset()
+ defer y.Destroy()
batchSize := 4
buf := make([]byte, batchSize)