aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-17 23:23:49 -0800
committerchrislu <chris.lu@gmail.com>2022-01-17 23:23:49 -0800
commit9274557552e3f3f4b6f7071c47ba8d7aa5c6f64a (patch)
treeaf724d9840cd4e633c3776cae7962e7aa94cfdf6
parentc87b8f4c302d225cce45cdfc312c35ddbf7c8f86 (diff)
downloadseaweedfs-9274557552e3f3f4b6f7071c47ba8d7aa5c6f64a.tar.xz
seaweedfs-9274557552e3f3f4b6f7071c47ba8d7aa5c6f64a.zip
keep dirty pages based on temp file
-rw-r--r--weed/filesys/dirty_pages_temp_file.go112
-rw-r--r--weed/filesys/page_writer.go1
-rw-r--r--weed/filesys/page_writer/chunked_file_writer.go159
-rw-r--r--weed/filesys/page_writer/chunked_file_writer_test.go60
4 files changed, 332 insertions, 0 deletions
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
new file mode 100644
index 000000000..12f5b5159
--- /dev/null
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -0,0 +1,112 @@
+package filesys
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
+ "os"
+ "sync"
+ "time"
+)
+
+type TempFileDirtyPages struct {
+ f *File
+ writeWaitGroup sync.WaitGroup
+ pageAddLock sync.Mutex
+ chunkAddLock sync.Mutex
+ lastErr error
+ collection string
+ replication string
+ chunkedFile *page_writer.ChunkedFileWriter
+}
+
+func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages {
+
+ tempFile := &TempFileDirtyPages{
+ f: file,
+ chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize),
+ }
+
+ return tempFile
+}
+
+func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
+
+ pages.pageAddLock.Lock()
+ defer pages.pageAddLock.Unlock()
+
+ glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data)))
+ if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil {
+ pages.lastErr = err
+ }
+
+ return
+}
+
+func (pages *TempFileDirtyPages) FlushData() error {
+ pages.saveChunkedFileToStorage()
+ pages.writeWaitGroup.Wait()
+ if pages.lastErr != nil {
+ return fmt.Errorf("flush data: %v", pages.lastErr)
+ }
+ pages.chunkedFile.Reset()
+ return nil
+}
+
+func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ return pages.chunkedFile.ReadDataAt(data, startOffset)
+}
+
+func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
+ return pages.collection, pages.replication
+}
+
+func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
+
+ pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) {
+ reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
+ pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size())
+ })
+
+}
+
+func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) {
+
+ mtime := time.Now().UnixNano()
+ pages.writeWaitGroup.Add(1)
+ writer := func() {
+ defer pages.writeWaitGroup.Done()
+
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
+ if err != nil {
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
+ pages.lastErr = err
+ return
+ }
+ chunk.Mtime = mtime
+ pages.collection, pages.replication = collection, replication
+ pages.chunkAddLock.Lock()
+ defer pages.chunkAddLock.Unlock()
+ pages.f.addChunks([]*filer_pb.FileChunk{chunk})
+ glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
+ }
+
+ if pages.f.wfs.concurrentWriters != nil {
+ pages.f.wfs.concurrentWriters.Execute(writer)
+ } else {
+ go writer()
+ }
+
+}
+
+func (pages TempFileDirtyPages) Destroy() {
+ pages.chunkedFile.Reset()
+}
+
+func (pages *TempFileDirtyPages) LockForRead(startOffset, stopOffset int64) {
+}
+
+func (pages *TempFileDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
+}
diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go
index c6d08348d..b8e884f58 100644
--- a/weed/filesys/page_writer.go
+++ b/weed/filesys/page_writer.go
@@ -25,6 +25,7 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
chunkSize: chunkSize,
writerPattern: NewWriterPattern(chunkSize),
randomWriter: newMemoryChunkPages(fh, chunkSize),
+ // randomWriter: newTempFileDirtyPages(fh.f, chunkSize),
}
return pw
}
diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go
new file mode 100644
index 000000000..4dad46c69
--- /dev/null
+++ b/weed/filesys/page_writer/chunked_file_writer.go
@@ -0,0 +1,159 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "io"
+ "os"
+ "sync"
+)
+
+type ActualChunkIndex int
+
+// ChunkedFileWriter assumes the write requests will come in within chunks
+type ChunkedFileWriter struct {
+ dir string
+ file *os.File
+ logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
+ chunkUsages []*ChunkWrittenIntervalList
+ ChunkSize int64
+ sync.Mutex
+}
+
+var _ = io.WriterAt(&ChunkedFileWriter{})
+
+func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
+ return &ChunkedFileWriter{
+ dir: dir,
+ file: nil,
+ logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
+ ChunkSize: chunkSize,
+ }
+}
+
+func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
+ cw.Lock()
+ defer cw.Unlock()
+
+ if cw.file == nil {
+ cw.file, err = os.CreateTemp(cw.dir, "")
+ if err != nil {
+ glog.Errorf("create temp file: %v", err)
+ return
+ }
+ }
+
+ actualOffset, chunkUsage := cw.toActualWriteOffset(off)
+ n, err = cw.file.WriteAt(p, actualOffset)
+ if err == nil {
+ startOffset := off % cw.ChunkSize
+ chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
+ }
+ return
+}
+
+func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ cw.Lock()
+ defer cw.Unlock()
+
+ if cw.file == nil {
+ return
+ }
+
+ logicChunkIndex := off / cw.ChunkSize
+ actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
+ if chunkUsage != nil {
+ for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
+ logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset)
+ logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
+ if logicStart < logicStop {
+ actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
+ _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
+ if err != nil {
+ glog.Errorf("reading temp file: %v", err)
+ break
+ }
+ maxStop = max(maxStop, logicStop)
+ }
+ }
+ }
+ return
+}
+
+func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) {
+ logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
+ offsetRemainder := logicOffset % cw.ChunkSize
+ existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
+ if found {
+ return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
+ }
+ cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages))
+ chunkUsage = newChunkWrittenIntervalList()
+ cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
+ return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
+}
+
+func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) {
+ logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
+ existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
+ if found {
+ return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
+ }
+ return 0, nil
+}
+
+func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) {
+ for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
+ chunkUsage := cw.chunkUsages[actualChunkIndex]
+ for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
+ process(cw.file, logicChunkIndex, t)
+ }
+ }
+}
+
+// Reset releases used resources
+func (cw *ChunkedFileWriter) Reset() {
+ if cw.file != nil {
+ cw.file.Close()
+ os.Remove(cw.file.Name())
+ cw.file = nil
+ }
+ cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex)
+ cw.chunkUsages = cw.chunkUsages[:0]
+}
+
+type FileIntervalReader struct {
+ f *os.File
+ startOffset int64
+ stopOffset int64
+ position int64
+}
+
+var _ = io.Reader(&FileIntervalReader{})
+
+func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader {
+ actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
+ if !found {
+ // this should never happen
+ return nil
+ }
+ return &FileIntervalReader{
+ f: cw.file,
+ startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset,
+ stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
+ position: 0,
+ }
+}
+
+func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
+ readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
+ n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
+ if err == nil || err == io.EOF {
+ fr.position += int64(n)
+ if fr.stopOffset-fr.startOffset-fr.position == 0 {
+ // return a tiny bit faster
+ err = io.EOF
+ return
+ }
+ }
+ return
+}
diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go
new file mode 100644
index 000000000..244ed62c3
--- /dev/null
+++ b/weed/filesys/page_writer/chunked_file_writer_test.go
@@ -0,0 +1,60 @@
+package page_writer
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+func TestChunkedFileWriter_toActualOffset(t *testing.T) {
+ cw := NewChunkedFileWriter("", 16)
+
+ writeToFile(cw, 50, 60)
+ writeToFile(cw, 60, 64)
+
+ writeToFile(cw, 32, 40)
+ writeToFile(cw, 42, 48)
+
+ writeToFile(cw, 48, 50)
+
+ assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered")
+ assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals")
+
+}
+
+func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
+
+ _, chunkUsage := cw.toActualWriteOffset(startOffset)
+
+ // skip doing actual writing
+
+ innerOffset := startOffset % cw.ChunkSize
+ chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset)
+
+}
+
+func TestWriteChunkedFile(t *testing.T) {
+ x := NewChunkedFileWriter(os.TempDir(), 20)
+ defer x.Reset()
+ y := NewChunkedFileWriter(os.TempDir(), 12)
+ defer y.Reset()
+
+ batchSize := 4
+ buf := make([]byte, batchSize)
+ for i := 0; i < 256; i++ {
+ for x := 0; x < batchSize; x++ {
+ buf[x] = byte(i)
+ }
+ x.WriteAt(buf, int64(i*batchSize))
+ y.WriteAt(buf, int64((255-i)*batchSize))
+ }
+
+ a := make([]byte, 1)
+ b := make([]byte, 1)
+ for i := 0; i < 256*batchSize; i++ {
+ x.ReadDataAt(a, int64(i))
+ y.ReadDataAt(b, int64(256*batchSize-1-i))
+ assert.Equal(t, a[0], b[0], "same read")
+ }
+
+}