aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/page_writer/page_chunk_swapfile.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/page_writer/page_chunk_swapfile.go')
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go125
1 files changed, 77 insertions, 48 deletions
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index bf2cdb256..6cedc64df 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -15,12 +15,12 @@ var (
type ActualChunkIndex int
type SwapFile struct {
- dir string
- file *os.File
- logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
- logicToActualChunkIndexLock sync.Mutex
- chunkSize int64
- freeActualChunkList []ActualChunkIndex
+ dir string
+ file *os.File
+ chunkSize int64
+ chunkTrackingLock sync.Mutex
+ activeChunkCount int
+ freeActualChunkList []ActualChunkIndex
}
type SwapFileChunk struct {
@@ -29,14 +29,15 @@ type SwapFileChunk struct {
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
+ activityScore *ActivityScore
+ //memChunk *MemChunk
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
- dir: dir,
- file: nil,
- logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
- chunkSize: chunkSize,
+ dir: dir,
+ file: nil,
+ chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
@@ -46,7 +47,7 @@ func (sf *SwapFile) FreeResource() {
}
}
-func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
+func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
@@ -55,70 +56,98 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
return nil
}
}
- sf.logicToActualChunkIndexLock.Lock()
- defer sf.logicToActualChunkIndexLock.Unlock()
- actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
- if !found {
- if len(sf.freeActualChunkList) > 0 {
- actualChunkIndex = sf.freeActualChunkList[0]
- sf.freeActualChunkList = sf.freeActualChunkList[1:]
- } else {
- actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
- }
- sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
+ sf.chunkTrackingLock.Lock()
+ defer sf.chunkTrackingLock.Unlock()
+
+ sf.activeChunkCount++
+
+ // assign a new physical chunk
+ var actualChunkIndex ActualChunkIndex
+ if len(sf.freeActualChunkList) > 0 {
+ actualChunkIndex = sf.freeActualChunkList[0]
+ sf.freeActualChunkList = sf.freeActualChunkList[1:]
+ } else {
+ actualChunkIndex = ActualChunkIndex(sf.activeChunkCount)
}
- return &SwapFileChunk{
+ swapFileChunk := &SwapFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
+ activityScore: NewActivityScore(),
+ // memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize),
}
+
+ // println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf)
+ return swapFileChunk
}
func (sc *SwapFileChunk) FreeResource() {
- sc.swapfile.logicToActualChunkIndexLock.Lock()
- defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
sc.Lock()
defer sc.Unlock()
+ sc.swapfile.chunkTrackingLock.Lock()
+ defer sc.swapfile.chunkTrackingLock.Unlock()
+
sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
- delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
+ sc.swapfile.activeChunkCount--
+ // println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile)
}
-func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
+func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
sc.Lock()
defer sc.Unlock()
+ // println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex)
+
innerOffset := offset % sc.swapfile.chunkSize
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
- if err == nil {
- sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
- } else {
+ sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
+ if err != nil {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
+ //sc.memChunk.WriteDataAt(src, offset, tsNs)
+ sc.activityScore.MarkWrite()
+
return
}
-func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
sc.RLock()
defer sc.RUnlock()
+ // println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex)
+
+ //memCopy := make([]byte, len(p))
+ //copy(memCopy, p)
+
chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
- actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
- if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
- glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
- break
+ if t.TsNs >= tsNs {
+ actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
+ if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
+ glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
+ break
+ }
+ maxStop = max(maxStop, logicStop)
+ } else {
+ println("read old data2", tsNs-t.TsNs, "ns")
}
- maxStop = max(maxStop, logicStop)
}
}
+ //sc.memChunk.ReadDataAt(memCopy, off, tsNs)
+ //if bytes.Compare(memCopy, p) != 0 {
+ // println("read wrong data from swap file", off, sc.logicChunkIndex)
+ //}
+
+ sc.activityScore.MarkRead()
+
return
}
@@ -128,27 +157,27 @@ func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
-func (sc *SwapFileChunk) WrittenSize() int64 {
- sc.RLock()
- defer sc.RUnlock()
- return sc.usage.WrittenSize()
+func (sc *SwapFileChunk) ActivityScore() int64 {
+ return sc.activityScore.ActivityScore()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
+ sc.RLock()
+ defer sc.RUnlock()
+
if saveFn == nil {
return
}
- sc.Lock()
- defer sc.Unlock()
-
+ // println(sc.logicChunkIndex, "|", "save")
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
- sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
- reader := util.NewBytesReader(data)
- saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
- })
+ n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
+ if n > 0 {
+ reader := util.NewBytesReader(data[:n])
+ saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() {
+ })
+ }
mem.Free(data)
}
- sc.usage = newChunkWrittenIntervalList()
}