diff options
Diffstat (limited to 'weed/mount/page_writer/page_chunk_swapfile.go')
| -rw-r--r-- | weed/mount/page_writer/page_chunk_swapfile.go | 125 |
1 files changed, 77 insertions, 48 deletions
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index bf2cdb256..6cedc64df 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -15,12 +15,12 @@ var ( type ActualChunkIndex int type SwapFile struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - logicToActualChunkIndexLock sync.Mutex - chunkSize int64 - freeActualChunkList []ActualChunkIndex + dir string + file *os.File + chunkSize int64 + chunkTrackingLock sync.Mutex + activeChunkCount int + freeActualChunkList []ActualChunkIndex } type SwapFileChunk struct { @@ -29,14 +29,15 @@ type SwapFileChunk struct { usage *ChunkWrittenIntervalList logicChunkIndex LogicChunkIndex actualChunkIndex ActualChunkIndex + activityScore *ActivityScore + //memChunk *MemChunk } func NewSwapFile(dir string, chunkSize int64) *SwapFile { return &SwapFile{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - chunkSize: chunkSize, + dir: dir, + file: nil, + chunkSize: chunkSize, } } func (sf *SwapFile) FreeResource() { @@ -46,7 +47,7 @@ func (sf *SwapFile) FreeResource() { } } -func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { +func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { if sf.file == nil { var err error sf.file, err = os.CreateTemp(sf.dir, "") @@ -55,70 +56,98 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF return nil } } - sf.logicToActualChunkIndexLock.Lock() - defer sf.logicToActualChunkIndexLock.Unlock() - actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] - if !found { - if len(sf.freeActualChunkList) > 0 { - actualChunkIndex = sf.freeActualChunkList[0] - sf.freeActualChunkList = sf.freeActualChunkList[1:] - } else { - actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) - } - sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex + sf.chunkTrackingLock.Lock() + defer sf.chunkTrackingLock.Unlock() + + sf.activeChunkCount++ + + // assign a new physical chunk + var actualChunkIndex ActualChunkIndex + if len(sf.freeActualChunkList) > 0 { + actualChunkIndex = sf.freeActualChunkList[0] + sf.freeActualChunkList = sf.freeActualChunkList[1:] + } else { + actualChunkIndex = ActualChunkIndex(sf.activeChunkCount) } - return &SwapFileChunk{ + swapFileChunk := &SwapFileChunk{ swapfile: sf, usage: newChunkWrittenIntervalList(), logicChunkIndex: logicChunkIndex, actualChunkIndex: actualChunkIndex, + activityScore: NewActivityScore(), + // memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize), } + + // println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf) + return swapFileChunk } func (sc *SwapFileChunk) FreeResource() { - sc.swapfile.logicToActualChunkIndexLock.Lock() - defer sc.swapfile.logicToActualChunkIndexLock.Unlock() sc.Lock() defer sc.Unlock() + sc.swapfile.chunkTrackingLock.Lock() + defer sc.swapfile.chunkTrackingLock.Unlock() + sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex) - delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex) + sc.swapfile.activeChunkCount-- + // println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile) } -func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { +func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) { sc.Lock() defer sc.Unlock() + // println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex) + innerOffset := offset % sc.swapfile.chunkSize var err error n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) - if err == nil { - sc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) - } else { + sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs) + if err != nil { glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err) } + //sc.memChunk.WriteDataAt(src, offset, tsNs) + sc.activityScore.MarkWrite() + return } -func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { +func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { sc.RLock() defer sc.RUnlock() + // println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex) + + //memCopy := make([]byte, len(p)) + //copy(memCopy, p) + chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { logicStart := max(off, chunkStartOffset+t.StartOffset) logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) if logicStart < logicStop { - actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize - if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { - glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) - break + if t.TsNs >= tsNs { + actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize + if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) + break + } + maxStop = max(maxStop, logicStop) + } else { + println("read old data2", tsNs-t.TsNs, "ns") } - maxStop = max(maxStop, logicStop) } } + //sc.memChunk.ReadDataAt(memCopy, off, tsNs) + //if bytes.Compare(memCopy, p) != 0 { + // println("read wrong data from swap file", off, sc.logicChunkIndex) + //} + + sc.activityScore.MarkRead() + return } @@ -128,27 +157,27 @@ func (sc *SwapFileChunk) IsComplete() bool { return sc.usage.IsComplete(sc.swapfile.chunkSize) } -func (sc *SwapFileChunk) WrittenSize() int64 { - sc.RLock() - defer sc.RUnlock() - return sc.usage.WrittenSize() +func (sc *SwapFileChunk) ActivityScore() int64 { + return sc.activityScore.ActivityScore() } func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { + sc.RLock() + defer sc.RUnlock() + if saveFn == nil { return } - sc.Lock() - defer sc.Unlock() - + // println(sc.logicChunkIndex, "|", "save") for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { data := mem.Allocate(int(t.Size())) - sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) - reader := util.NewBytesReader(data) - saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() { - }) + n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) + if n > 0 { + reader := util.NewBytesReader(data[:n]) + saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() { + }) + } mem.Free(data) } - sc.usage = newChunkWrittenIntervalList() } |
