aboutsummaryrefslogtreecommitdiff
path: root/weed/mount
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount')
-rw-r--r--weed/mount/dirty_pages_chunked.go16
-rw-r--r--weed/mount/filehandle.go109
-rw-r--r--weed/mount/filehandle_map.go4
-rw-r--r--weed/mount/filehandle_read.go43
-rw-r--r--weed/mount/page_writer.go12
-rw-r--r--weed/mount/page_writer/activity_score.go39
-rw-r--r--weed/mount/page_writer/chunk_interval_list.go83
-rw-r--r--weed/mount/page_writer/chunk_interval_list_test.go72
-rw-r--r--weed/mount/page_writer/dirty_pages.go4
-rw-r--r--weed/mount/page_writer/page_chunk.go8
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go31
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go125
-rw-r--r--weed/mount/page_writer/upload_pipeline.go40
-rw-r--r--weed/mount/page_writer/upload_pipeline_test.go4
-rw-r--r--weed/mount/weedfs_attr.go20
-rw-r--r--weed/mount/weedfs_file_copy_range.go12
-rw-r--r--weed/mount/weedfs_file_lseek.go43
-rw-r--r--weed/mount/weedfs_file_read.go28
-rw-r--r--weed/mount/weedfs_file_sync.go15
-rw-r--r--weed/mount/weedfs_file_write.go15
-rw-r--r--weed/mount/weedfs_write.go4
21 files changed, 413 insertions, 314 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
index 78e7b7877..56c97549f 100644
--- a/weed/mount/dirty_pages_chunked.go
+++ b/weed/mount/dirty_pages_chunked.go
@@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
- "time"
)
type ChunkedDirtyPages struct {
@@ -38,11 +37,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
return dirtyPages
}
-func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) {
+func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
- pages.uploadPipeline.SaveDataAt(data, offset, isSequential)
+ pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs)
return
}
@@ -58,28 +57,27 @@ func (pages *ChunkedDirtyPages) FlushData() error {
return nil
}
-func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) {
if !pages.hasWrites {
return
}
- return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
+ return pages.uploadPipeline.MaybeReadDataAt(data, startOffset, tsNs)
}
-func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
+func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) {
- mtime := time.Now().UnixNano()
defer cleanupFn()
fileFullPath := pages.fh.FullPath()
fileName := fileFullPath.Name()
- chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
+ chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs)
if err != nil {
glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
pages.lastErr = err
return
}
- chunk.ModifiedTsNs = mtime
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
+ pages.fh.entryChunkGroup.AddChunk(chunk)
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
}
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index b6ec3d2da..67298b047 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -5,50 +5,60 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "golang.org/x/exp/slices"
- "golang.org/x/sync/semaphore"
- "math"
+ "os"
"sync"
)
type FileHandleId uint64
+var IsDebugFileReadWrite = false
+
type FileHandle struct {
- fh FileHandleId
- counter int64
- entry *LockedEntry
- entryLock sync.Mutex
- inode uint64
- wfs *WFS
+ fh FileHandleId
+ counter int64
+ entry *LockedEntry
+ entryLock sync.Mutex
+ entryChunkGroup *filer.ChunkGroup
+ inode uint64
+ wfs *WFS
// cache file has been written to
- dirtyMetadata bool
- dirtyPages *PageWriter
- entryViewCache []filer.VisibleInterval
- reader *filer.ChunkReadAt
- contentType string
- handle uint64
- orderedMutex *semaphore.Weighted
+ dirtyMetadata bool
+ dirtyPages *PageWriter
+ reader *filer.ChunkReadAt
+ contentType string
+ handle uint64
+ sync.Mutex
isDeleted bool
+
+ // for debugging
+ mirrorFile *os.File
}
func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
fh := &FileHandle{
- fh: handleId,
- counter: 1,
- inode: inode,
- wfs: wfs,
- orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
+ fh: handleId,
+ counter: 1,
+ inode: inode,
+ wfs: wfs,
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
- if entry != nil {
- entry.Attributes.FileSize = filer.FileSize(entry)
- }
fh.entry = &LockedEntry{
Entry: entry,
}
+ if entry != nil {
+ fh.SetEntry(entry)
+ }
+
+ if IsDebugFileReadWrite {
+ var err error
+ fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
+ if err != nil {
+ println("failed to create mirror:", err.Error())
+ }
+ }
return fh
}
@@ -63,6 +73,17 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry {
}
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
+ if entry != nil {
+ fileSize := filer.FileSize(entry)
+ entry.Attributes.FileSize = fileSize
+ var resolveManifestErr error
+ fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
+ if resolveManifestErr != nil {
+ glog.Warningf("failed to resolve manifest chunks in %+v", entry)
+ }
+ } else {
+ glog.Fatalf("setting file handle entry to nil")
+ }
fh.entry.SetEntry(entry)
}
@@ -78,43 +99,17 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
return
}
- // find the earliest incoming chunk
- newChunks := chunks
- earliestChunk := newChunks[0]
- for i := 1; i < len(newChunks); i++ {
- if lessThan(earliestChunk, newChunks[i]) {
- earliestChunk = newChunks[i]
- }
- }
-
- // pick out-of-order chunks from existing chunks
- for _, chunk := range fh.entry.GetChunks() {
- if lessThan(earliestChunk, chunk) {
- chunks = append(chunks, chunk)
- }
- }
-
- // sort incoming chunks
- slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
- return lessThan(a, b)
- })
-
- glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))
-
- fh.entry.AppendChunks(newChunks)
- fh.entryViewCache = nil
+ fh.entry.AppendChunks(chunks)
}
-func (fh *FileHandle) CloseReader() {
- if fh.reader != nil {
- _ = fh.reader.Close()
- fh.reader = nil
- }
-}
+func (fh *FileHandle) ReleaseHandle() {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
-func (fh *FileHandle) Release() {
fh.dirtyPages.Destroy()
- fh.CloseReader()
+ if IsDebugFileReadWrite {
+ fh.mirrorFile.Close()
+ }
}
func lessThan(a, b *filer_pb.FileChunk) bool {
diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go
index cc5885ffc..f0051f061 100644
--- a/weed/mount/filehandle_map.go
+++ b/weed/mount/filehandle_map.go
@@ -65,7 +65,7 @@ func (i *FileHandleToInode) ReleaseByInode(inode uint64) {
if fh.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fh.fh)
- fh.Release()
+ fh.ReleaseHandle()
}
}
}
@@ -82,7 +82,7 @@ func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) {
if fhHandle.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fhHandle.fh)
- fhHandle.Release()
+ fhHandle.ReleaseHandle()
}
}
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index a316a16cd..be6d5d984 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -17,18 +17,20 @@ func (fh *FileHandle) unlockForRead(startOffset int64, size int) {
fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size))
}
-func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
- maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs int64) (maxStop int64) {
+ maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset, tsNs)
return
}
-func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
+func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
fileFullPath := fh.FullPath()
entry := fh.GetEntry()
if entry == nil {
- return 0, io.EOF
+ return 0, 0, io.EOF
}
if entry.IsInRemoteOnly() {
@@ -36,43 +38,28 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
newEntry, err := fh.downloadRemoteEntry(entry)
if err != nil {
glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err)
- return 0, err
+ return 0, 0, err
}
entry = newEntry
}
- fileSize := int64(filer.FileSize(entry))
+ fileSize := int64(entry.Attributes.FileSize)
+ if fileSize == 0 {
+ fileSize = int64(filer.FileSize(entry))
+ }
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath)
- return 0, io.EOF
+ return 0, 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
- return int64(totalRead), nil
- }
-
- var chunkResolveErr error
- if fh.entryViewCache == nil {
- fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize)
- if chunkResolveErr != nil {
- return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
- }
- fh.CloseReader()
- }
-
- if fh.reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize)
- glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews))
- for _, chunkView := range chunkViews {
- glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId)
- }
- fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize)
+ return int64(totalRead), 0, nil
}
- totalRead, err := fh.reader.ReadAt(buff, offset)
+ totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
@@ -80,7 +67,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
- return int64(totalRead), err
+ return int64(totalRead), ts, err
}
func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go
index 1f31b5300..c9470c440 100644
--- a/weed/mount/page_writer.go
+++ b/weed/mount/page_writer.go
@@ -29,35 +29,35 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
return pw
}
-func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) {
+func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- pw.addToOneChunk(i, offset, data[:writeSize], isSequential)
+ pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs)
offset += writeSize
data = data[writeSize:]
}
}
-func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) {
- pw.randomWriter.AddPage(offset, data, isSequential)
+func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) {
+ pw.randomWriter.AddPage(offset, data, isSequential, tsNs)
}
func (pw *PageWriter) FlushData() error {
return pw.randomWriter.FlushData()
}
-func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
+func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64, tsNs int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
+ maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset, tsNs)
offset += readSize
data = data[readSize:]
diff --git a/weed/mount/page_writer/activity_score.go b/weed/mount/page_writer/activity_score.go
new file mode 100644
index 000000000..22da87e37
--- /dev/null
+++ b/weed/mount/page_writer/activity_score.go
@@ -0,0 +1,39 @@
+package page_writer
+
+import "time"
+
+type ActivityScore struct {
+ lastActiveTsNs int64
+ decayedActivenessScore int64
+}
+
+func NewActivityScore() *ActivityScore {
+ return &ActivityScore{}
+}
+
+func (as ActivityScore) MarkRead() {
+ now := time.Now().UnixNano()
+ deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
+ as.lastActiveTsNs = now
+
+ as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 256
+ if as.decayedActivenessScore < 0 {
+ as.decayedActivenessScore = 0
+ }
+}
+
+func (as ActivityScore) MarkWrite() {
+ now := time.Now().UnixNano()
+ deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
+ as.lastActiveTsNs = now
+
+ as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 1024
+ if as.decayedActivenessScore < 0 {
+ as.decayedActivenessScore = 0
+ }
+}
+
+func (as ActivityScore) ActivityScore() int64 {
+ deltaTime := (time.Now().UnixNano() - as.lastActiveTsNs) >> 30 // about number of seconds
+ return as.decayedActivenessScore >> deltaTime
+}
diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go
index a9d64c8e4..005385c1a 100644
--- a/weed/mount/page_writer/chunk_interval_list.go
+++ b/weed/mount/page_writer/chunk_interval_list.go
@@ -8,6 +8,7 @@ import (
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
+ TsNs int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
@@ -42,10 +43,14 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
return list
}
-func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
+func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset, tsNs int64) {
+ if startOffset >= stopOffset {
+ return
+ }
interval := &ChunkWrittenInterval{
StartOffset: startOffset,
stopOffset: stopOffset,
+ TsNs: tsNs,
}
list.addInterval(interval)
}
@@ -62,50 +67,54 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
+ //t := list.head
+ //for ; t.next != nil; t = t.next {
+ // if t.TsNs > interval.TsNs {
+ // println("writes is out of order", t.TsNs-interval.TsNs, "ns")
+ // }
+ //}
+
p := list.head
- for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
+ for ; p.next != nil && p.next.stopOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
- for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
+ for ; q.prev != nil && q.prev.StartOffset >= interval.stopOffset; q = q.prev {
}
- if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
- // merge p and q together
- p.stopOffset = q.stopOffset
- unlinkNodesBetween(p, q.next)
- return
+ // left side
+ // interval after p.next start
+ if p.next.StartOffset < interval.StartOffset {
+ t := &ChunkWrittenInterval{
+ StartOffset: p.next.StartOffset,
+ stopOffset: interval.StartOffset,
+ TsNs: p.next.TsNs,
+ }
+ p.next = t
+ t.prev = p
+ t.next = interval
+ interval.prev = t
+ } else {
+ p.next = interval
+ interval.prev = p
}
- if interval.StartOffset <= p.stopOffset {
- // merge new interval into p
- p.stopOffset = interval.stopOffset
- unlinkNodesBetween(p, q)
- return
- }
- if q.StartOffset <= interval.stopOffset {
- // merge new interval into q
- q.StartOffset = interval.StartOffset
- unlinkNodesBetween(p, q)
- return
- }
-
- // add the new interval between p and q
- unlinkNodesBetween(p, q)
- p.next = interval
- interval.prev = p
- q.prev = interval
- interval.next = q
-
-}
-// unlinkNodesBetween remove all nodes after start and before stop, exclusive
-func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
- if start.next == stop {
- return
+ // right side
+ // interval ends before p.prev
+ if interval.stopOffset < q.prev.stopOffset {
+ t := &ChunkWrittenInterval{
+ StartOffset: interval.stopOffset,
+ stopOffset: q.prev.stopOffset,
+ TsNs: q.prev.TsNs,
+ }
+ q.prev = t
+ t.next = q
+ interval.next = t
+ t.prev = interval
+ } else {
+ q.prev = interval
+ interval.next = q
}
- start.next.prev = nil
- start.next = stop
- stop.prev.next = nil
- stop.prev = start
+
}
func (list *ChunkWrittenIntervalList) size() int {
diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go
index b22f5eb5d..eb1d5ff46 100644
--- a/weed/mount/page_writer/chunk_interval_list_test.go
+++ b/weed/mount/page_writer/chunk_interval_list_test.go
@@ -10,40 +10,72 @@ func Test_PageChunkWrittenIntervalList(t *testing.T) {
assert.Equal(t, 0, list.size(), "empty list")
- list.MarkWritten(0, 5)
+ list.MarkWritten(0, 5, 1)
assert.Equal(t, 1, list.size(), "one interval")
- list.MarkWritten(0, 5)
+ list.MarkWritten(0, 5, 2)
assert.Equal(t, 1, list.size(), "duplicated interval2")
- list.MarkWritten(95, 100)
+ list.MarkWritten(95, 100, 3)
assert.Equal(t, 2, list.size(), "two intervals")
- list.MarkWritten(50, 60)
+ list.MarkWritten(50, 60, 4)
assert.Equal(t, 3, list.size(), "three intervals")
- list.MarkWritten(50, 55)
- assert.Equal(t, 3, list.size(), "three intervals merge")
+ list.MarkWritten(50, 55, 5)
+ assert.Equal(t, 4, list.size(), "three intervals merge")
- list.MarkWritten(40, 50)
- assert.Equal(t, 3, list.size(), "three intervals grow forward")
+ list.MarkWritten(40, 50, 6)
+ assert.Equal(t, 5, list.size(), "three intervals grow forward")
- list.MarkWritten(50, 65)
- assert.Equal(t, 3, list.size(), "three intervals grow backward")
+ list.MarkWritten(50, 65, 7)
+ assert.Equal(t, 4, list.size(), "three intervals grow backward")
- list.MarkWritten(70, 80)
- assert.Equal(t, 4, list.size(), "four intervals")
+ list.MarkWritten(70, 80, 8)
+ assert.Equal(t, 5, list.size(), "four intervals")
- list.MarkWritten(60, 70)
- assert.Equal(t, 3, list.size(), "three intervals merged")
+ list.MarkWritten(60, 70, 9)
+ assert.Equal(t, 6, list.size(), "three intervals merged")
- list.MarkWritten(59, 71)
- assert.Equal(t, 3, list.size(), "covered three intervals")
+ list.MarkWritten(59, 71, 10)
+ assert.Equal(t, 6, list.size(), "covered three intervals")
- list.MarkWritten(5, 59)
- assert.Equal(t, 2, list.size(), "covered two intervals")
+ list.MarkWritten(5, 59, 11)
+ assert.Equal(t, 5, list.size(), "covered two intervals")
- list.MarkWritten(70, 99)
- assert.Equal(t, 1, list.size(), "covered one intervals")
+ list.MarkWritten(70, 99, 12)
+ assert.Equal(t, 5, list.size(), "covered one intervals")
}
+
+type interval struct {
+ start int64
+ stop int64
+ expected bool
+}
+
+func Test_PageChunkWrittenIntervalList1(t *testing.T) {
+ list := newChunkWrittenIntervalList()
+ inputs := []interval{
+ {1, 5, true},
+ {2, 3, true},
+ }
+ for i, input := range inputs {
+ list.MarkWritten(input.start, input.stop, int64(i)+1)
+ actual := hasData(list, 0, 4)
+ if actual != input.expected {
+ t.Errorf("input [%d,%d) expected %v actual %v", input.start, input.stop, input.expected, actual)
+ }
+ }
+}
+
+func hasData(usage *ChunkWrittenIntervalList, chunkStartOffset, x int64) bool {
+ for t := usage.head.next; t != usage.tail; t = t.next {
+ logicStart := chunkStartOffset + t.StartOffset
+ logicStop := chunkStartOffset + t.stopOffset
+ if logicStart <= x && x < logicStop {
+ return true
+ }
+ }
+ return false
+}
diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go
index 44f879afc..7cddcf69e 100644
--- a/weed/mount/page_writer/dirty_pages.go
+++ b/weed/mount/page_writer/dirty_pages.go
@@ -1,9 +1,9 @@
package page_writer
type DirtyPages interface {
- AddPage(offset int64, data []byte, isSequential bool)
+ AddPage(offset int64, data []byte, isSequential bool, tsNs int64)
FlushData() error
- ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
+ ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64)
Destroy()
LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64)
diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go
index 4e8f31425..32d246deb 100644
--- a/weed/mount/page_writer/page_chunk.go
+++ b/weed/mount/page_writer/page_chunk.go
@@ -4,13 +4,13 @@ import (
"io"
)
-type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
+type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func())
type PageChunk interface {
FreeResource()
- WriteDataAt(src []byte, offset int64) (n int)
- ReadDataAt(p []byte, off int64) (maxStop int64)
+ WriteDataAt(src []byte, offset int64, tsNs int64) (n int)
+ ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
IsComplete() bool
- WrittenSize() int64
+ ActivityScore() int64
SaveContent(saveFn SaveToStorageFunc)
}
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
index 8cccded67..1ec8cecb4 100644
--- a/weed/mount/page_writer/page_chunk_mem.go
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -19,6 +19,7 @@ type MemChunk struct {
usage *ChunkWrittenIntervalList
chunkSize int64
logicChunkIndex LogicChunkIndex
+ activityScore *ActivityScore
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
@@ -28,6 +29,7 @@ func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
chunkSize: chunkSize,
buf: mem.Allocate(int(chunkSize)),
usage: newChunkWrittenIntervalList(),
+ activityScore: NewActivityScore(),
}
}
@@ -39,29 +41,37 @@ func (mc *MemChunk) FreeResource() {
mem.Free(mc.buf)
}
-func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
+func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
mc.Lock()
defer mc.Unlock()
innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src)
- mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
+ mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
+ mc.activityScore.MarkWrite()
+
return
}
-func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
mc.RLock()
defer mc.RUnlock()
memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
- logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
+ logicStart := max(off, memChunkBaseOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
- copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
- maxStop = max(maxStop, logicStop)
+ if t.TsNs >= tsNs {
+ copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
+ maxStop = max(maxStop, logicStop)
+ } else {
+ println("read old data1", tsNs-t.TsNs, "ns")
+ }
}
}
+ mc.activityScore.MarkRead()
+
return
}
@@ -72,11 +82,8 @@ func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize)
}
-func (mc *MemChunk) WrittenSize() int64 {
- mc.RLock()
- defer mc.RUnlock()
-
- return mc.usage.WrittenSize()
+func (mc *MemChunk) ActivityScore() int64 {
+ return mc.activityScore.ActivityScore()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
@@ -88,7 +95,7 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
}
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
- saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
+ saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() {
})
}
}
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index bf2cdb256..6cedc64df 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -15,12 +15,12 @@ var (
type ActualChunkIndex int
type SwapFile struct {
- dir string
- file *os.File
- logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
- logicToActualChunkIndexLock sync.Mutex
- chunkSize int64
- freeActualChunkList []ActualChunkIndex
+ dir string
+ file *os.File
+ chunkSize int64
+ chunkTrackingLock sync.Mutex
+ activeChunkCount int
+ freeActualChunkList []ActualChunkIndex
}
type SwapFileChunk struct {
@@ -29,14 +29,15 @@ type SwapFileChunk struct {
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
+ activityScore *ActivityScore
+ //memChunk *MemChunk
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
- dir: dir,
- file: nil,
- logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
- chunkSize: chunkSize,
+ dir: dir,
+ file: nil,
+ chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
@@ -46,7 +47,7 @@ func (sf *SwapFile) FreeResource() {
}
}
-func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
+func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
@@ -55,70 +56,98 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
return nil
}
}
- sf.logicToActualChunkIndexLock.Lock()
- defer sf.logicToActualChunkIndexLock.Unlock()
- actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
- if !found {
- if len(sf.freeActualChunkList) > 0 {
- actualChunkIndex = sf.freeActualChunkList[0]
- sf.freeActualChunkList = sf.freeActualChunkList[1:]
- } else {
- actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
- }
- sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
+ sf.chunkTrackingLock.Lock()
+ defer sf.chunkTrackingLock.Unlock()
+
+ sf.activeChunkCount++
+
+ // assign a new physical chunk
+ var actualChunkIndex ActualChunkIndex
+ if len(sf.freeActualChunkList) > 0 {
+ actualChunkIndex = sf.freeActualChunkList[0]
+ sf.freeActualChunkList = sf.freeActualChunkList[1:]
+ } else {
+ actualChunkIndex = ActualChunkIndex(sf.activeChunkCount)
}
- return &SwapFileChunk{
+ swapFileChunk := &SwapFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
+ activityScore: NewActivityScore(),
+ // memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize),
}
+
+ // println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf)
+ return swapFileChunk
}
func (sc *SwapFileChunk) FreeResource() {
- sc.swapfile.logicToActualChunkIndexLock.Lock()
- defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
sc.Lock()
defer sc.Unlock()
+ sc.swapfile.chunkTrackingLock.Lock()
+ defer sc.swapfile.chunkTrackingLock.Unlock()
+
sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
- delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
+ sc.swapfile.activeChunkCount--
+ // println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile)
}
-func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
+func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
sc.Lock()
defer sc.Unlock()
+ // println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex)
+
innerOffset := offset % sc.swapfile.chunkSize
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
- if err == nil {
- sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
- } else {
+ sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
+ if err != nil {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
+ //sc.memChunk.WriteDataAt(src, offset, tsNs)
+ sc.activityScore.MarkWrite()
+
return
}
-func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
sc.RLock()
defer sc.RUnlock()
+ // println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex)
+
+ //memCopy := make([]byte, len(p))
+ //copy(memCopy, p)
+
chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
- actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
- if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
- glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
- break
+ if t.TsNs >= tsNs {
+ actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
+ if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
+ glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
+ break
+ }
+ maxStop = max(maxStop, logicStop)
+ } else {
+ println("read old data2", tsNs-t.TsNs, "ns")
}
- maxStop = max(maxStop, logicStop)
}
}
+ //sc.memChunk.ReadDataAt(memCopy, off, tsNs)
+ //if bytes.Compare(memCopy, p) != 0 {
+ // println("read wrong data from swap file", off, sc.logicChunkIndex)
+ //}
+
+ sc.activityScore.MarkRead()
+
return
}
@@ -128,27 +157,27 @@ func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
-func (sc *SwapFileChunk) WrittenSize() int64 {
- sc.RLock()
- defer sc.RUnlock()
- return sc.usage.WrittenSize()
+func (sc *SwapFileChunk) ActivityScore() int64 {
+ return sc.activityScore.ActivityScore()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
+ sc.RLock()
+ defer sc.RUnlock()
+
if saveFn == nil {
return
}
- sc.Lock()
- defer sc.Unlock()
-
+ // println(sc.logicChunkIndex, "|", "save")
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
- sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
- reader := util.NewBytesReader(data)
- saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
- })
+ n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
+ if n > 0 {
+ reader := util.NewBytesReader(data[:n])
+ saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() {
+ })
+ }
mem.Free(data)
}
- sc.usage = newChunkWrittenIntervalList()
}
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 252dddc06..6065f2f76 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "math"
"sync"
"sync/atomic"
)
@@ -55,7 +56,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
return t
}
-func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
+func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) {
+
up.chunksLock.Lock()
defer up.chunksLock.Unlock()
@@ -65,33 +67,39 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
if !found {
if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit
- fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
- for lci, mc := range up.writableChunks {
- chunkFullness := mc.WrittenSize()
- if fullness < chunkFullness {
- fullestChunkIndex = lci
- fullness = chunkFullness
+ laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
+ for wci, wc := range up.writableChunks {
+ activityScore := wc.ActivityScore()
+ if lowestActivityScore > activityScore {
+ laziestChunkIndex = wci
+ lowestActivityScore = activityScore
}
}
- up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
- // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
+ up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex)
+ // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
}
if isSequential &&
len(up.writableChunks) < up.writableChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
- pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
+ pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex)
}
up.writableChunks[logicChunkIndex] = pageChunk
}
- n = pageChunk.WriteDataAt(p, off)
+ //if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed {
+ // println("found already sealed chunk", logicChunkIndex)
+ //}
+ //if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading {
+ // println("found active read chunk", logicChunkIndex)
+ //}
+ n = pageChunk.WriteDataAt(p, off, tsNs)
up.maybeMoveToSealed(pageChunk, logicChunkIndex)
return
}
-func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
+func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
up.chunksLock.Lock()
@@ -103,12 +111,8 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
// read from sealed chunks first
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found {
- sealedChunk.referenceCounter++
- }
- if found {
- maxStop = sealedChunk.chunk.ReadDataAt(p, off)
+ maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
- sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
}
// read from writable chunks last
@@ -116,7 +120,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
if !found {
return
}
- writableMaxStop := writableChunk.ReadDataAt(p, off)
+ writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)
diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go
index 27da7036d..2d803f6af 100644
--- a/weed/mount/page_writer/upload_pipeline_test.go
+++ b/weed/mount/page_writer/upload_pipeline_test.go
@@ -31,14 +31,14 @@ func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
- uploadPipeline.SaveDataAt(p, i, false)
+ uploadPipeline.SaveDataAt(p, i, false, 0)
}
}
func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff; i < stopOff/4; i += 4 {
- uploadPipeline.MaybeReadDataAt(p, i)
+ uploadPipeline.MaybeReadDataAt(p, i, 0)
x := util.BytesToUint32(p)
if x != uint32(i) {
t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go
index 1d58e0852..7dc3c6b50 100644
--- a/weed/mount/weedfs_attr.go
+++ b/weed/mount/weedfs_attr.go
@@ -20,12 +20,12 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
_, _, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK {
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, inode, entry)
+ wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
return status
} else {
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry())
+ wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true)
out.Nlink = 0
return fuse.OK
}
@@ -75,7 +75,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
// set the new chunks and reset entry cache
entry.Chunks = chunks
if fh != nil {
- fh.entryViewCache = nil
+ fh.entryChunkGroup.SetChunks(chunks)
}
}
entry.Attributes.Mtime = time.Now().Unix()
@@ -114,7 +114,11 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
}
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry)
+ size, includeSize := input.GetSize()
+ if includeSize {
+ out.Attr.Size = size
+ }
+ wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry, !includeSize)
if fh != nil {
fh.dirtyMetadata = true
@@ -139,12 +143,14 @@ func (wfs *WFS) setRootAttr(out *fuse.AttrOut) {
out.Nlink = 1
}
-func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry) {
+func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry, calculateSize bool) {
out.Ino = inode
if entry.Attributes != nil && entry.Attributes.Inode != 0 {
out.Ino = entry.Attributes.Inode
}
- out.Size = filer.FileSize(entry)
+ if calculateSize {
+ out.Size = filer.FileSize(entry)
+ }
if entry.FileMode()&os.ModeSymlink != 0 {
out.Size = uint64(len(entry.Attributes.SymlinkTarget))
}
@@ -194,7 +200,7 @@ func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb.
out.Generation = 1
out.EntryValid = 1
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, inode, entry)
+ wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
}
func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) {
diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go
index bc092a252..e3f841b02 100644
--- a/weed/mount/weedfs_file_copy_range.go
+++ b/weed/mount/weedfs_file_copy_range.go
@@ -1,8 +1,8 @@
package mount
import (
- "context"
"net/http"
+ "time"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -44,8 +44,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
// lock source and target file handles
- fhOut.orderedMutex.Acquire(context.Background(), 1)
- defer fhOut.orderedMutex.Release(1)
+ fhOut.Lock()
+ defer fhOut.Unlock()
fhOut.entryLock.Lock()
defer fhOut.entryLock.Unlock()
@@ -54,8 +54,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
if fhIn.fh != fhOut.fh {
- fhIn.orderedMutex.Acquire(context.Background(), 1)
- defer fhIn.orderedMutex.Release(1)
+ fhIn.Lock()
+ defer fhIn.Unlock()
fhIn.entryLock.Lock()
defer fhIn.entryLock.Unlock()
}
@@ -88,7 +88,7 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
// put data at the specified offset in target file
fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len))
fhOut.entry.Content = nil
- fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode())
+ fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano())
fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize)))
fhOut.dirtyMetadata = true
written = uint32(totalRead)
diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go
index 9d6402f96..93fc65247 100644
--- a/weed/mount/weedfs_file_lseek.go
+++ b/weed/mount/weedfs_file_lseek.go
@@ -1,7 +1,6 @@
package mount
import (
- "context"
"syscall"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -36,8 +35,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
}
// lock the file until the proper offset was calculated
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ fh.Lock()
+ defer fh.Unlock()
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
@@ -56,17 +55,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return ENXIO
}
- // refresh view cache if necessary
- if fh.entryViewCache == nil {
- var err error
- fh.entryViewCache, err = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), fh.entry.GetChunks(), 0, fileSize)
- if err != nil {
- return fuse.EIO
- }
- }
-
// search chunks for the offset
- found, offset := searchChunks(fh, offset, fileSize, in.Whence)
+ found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence)
if found {
out.Offset = uint64(offset)
return fuse.OK
@@ -82,30 +72,3 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return fuse.OK
}
-
-// searchChunks goes through all chunks to find the correct offset
-func searchChunks(fh *FileHandle, offset, fileSize int64, whence uint32) (found bool, out int64) {
- chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, offset, fileSize)
-
- for _, chunkView := range chunkViews {
- if offset < chunkView.LogicOffset {
- if whence == SEEK_HOLE {
- out = offset
- } else {
- out = chunkView.LogicOffset
- }
-
- return true, out
- }
-
- if offset >= chunkView.LogicOffset && offset < chunkView.Offset+int64(chunkView.Size) && whence == SEEK_DATA {
- out = offset
-
- return true, out
- }
-
- offset += int64(chunkView.Size)
- }
-
- return
-}
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index 8375f9a5d..cedece137 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -1,7 +1,8 @@
package mount
import (
- "context"
+ "bytes"
+ "fmt"
"io"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -40,8 +41,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ fh.Lock()
+ defer fh.Unlock()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)
@@ -50,6 +51,23 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.EIO
}
+ if IsDebugFileReadWrite {
+ // print(".")
+ mirrorData := make([]byte, totalRead)
+ fh.mirrorFile.ReadAt(mirrorData, offset)
+ if bytes.Compare(mirrorData, buff[:totalRead]) != 0 {
+
+ againBuff := make([]byte, len(buff))
+ againRead, _ := readDataByFileHandle(buff, fh, offset)
+ againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0
+ againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0
+
+ fmt.Printf("\ncompare %v [%d,%d) size:%d againSame:%v againCorrect:%v\n", fh.mirrorFile.Name(), offset, offset+totalRead, totalRead, againSame, againCorrect)
+ //fmt.Printf("read mirrow data: %v\n", mirrorData)
+ //fmt.Printf("read actual data: %v\n", buff[:totalRead])
+ }
+ }
+
return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}
@@ -59,9 +77,9 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
- n, err := fhIn.readFromChunks(buff, offset)
+ n, tsNs, err := fhIn.readFromChunks(buff, offset)
if err == nil || err == io.EOF {
- maxStop := fhIn.readFromDirtyPages(buff, offset)
+ maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
n = max(maxStop-offset, n)
}
if err == io.EOF {
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 7b7c66680..ac18e05ea 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -89,8 +89,8 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
}
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ fh.Lock()
+ defer fh.Unlock()
// flush works at fh level
fileFullPath := fh.FullPath()
@@ -145,9 +145,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
}
glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.GetChunks()))
- for i, chunk := range entry.GetChunks() {
- glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
+ //for i, chunk := range entry.GetChunks() {
+ // glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
+ //}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks())
@@ -158,6 +158,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
entry.Chunks = append(chunks, manifestChunks...)
+ fh.entryChunkGroup.SetChunks(entry.Chunks)
wfs.mapPbIdFromLocalToFiler(request.Entry)
defer wfs.mapPbIdFromFilerToLocal(request.Entry)
@@ -181,5 +182,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
return fuse.EIO
}
+ if IsDebugFileReadWrite {
+ fh.mirrorFile.Sync()
+ }
+
return fuse.OK
}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index 7b13d54ff..5a9a21ded 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -1,10 +1,10 @@
package mount
import (
- "context"
"github.com/hanwen/go-fuse/v2/fuse"
"net/http"
"syscall"
+ "time"
)
/**
@@ -46,8 +46,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ tsNs := time.Now().UnixNano()
+
+ fh.Lock()
+ defer fh.Unlock()
entry := fh.GetEntry()
if entry == nil {
@@ -59,7 +61,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
- fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode())
+ fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs)
written = uint32(len(data))
@@ -70,5 +72,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyMetadata = true
+ if IsDebugFileReadWrite {
+ // print("+")
+ fh.mirrorFile.WriteAt(data, offset)
+ }
+
return written, fuse.OK
}
diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go
index e18a4a358..4c8470245 100644
--- a/weed/mount/weedfs_write.go
+++ b/weed/mount/weedfs_write.go
@@ -13,7 +13,7 @@ import (
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
- return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, err error) {
+ return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
fileId, uploadResult, err, data := operation.UploadWithRetry(
wfs,
@@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
wfs.chunkCache.SetChunk(fileId, data)
}
- chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs)
return chunk, nil
}
}