diff options
Diffstat (limited to 'weed/mount')
| -rw-r--r-- | weed/mount/dirty_pages_chunked.go | 16 | ||||
| -rw-r--r-- | weed/mount/filehandle.go | 109 | ||||
| -rw-r--r-- | weed/mount/filehandle_map.go | 4 | ||||
| -rw-r--r-- | weed/mount/filehandle_read.go | 43 | ||||
| -rw-r--r-- | weed/mount/page_writer.go | 12 | ||||
| -rw-r--r-- | weed/mount/page_writer/activity_score.go | 39 | ||||
| -rw-r--r-- | weed/mount/page_writer/chunk_interval_list.go | 83 | ||||
| -rw-r--r-- | weed/mount/page_writer/chunk_interval_list_test.go | 72 | ||||
| -rw-r--r-- | weed/mount/page_writer/dirty_pages.go | 4 | ||||
| -rw-r--r-- | weed/mount/page_writer/page_chunk.go | 8 | ||||
| -rw-r--r-- | weed/mount/page_writer/page_chunk_mem.go | 31 | ||||
| -rw-r--r-- | weed/mount/page_writer/page_chunk_swapfile.go | 125 | ||||
| -rw-r--r-- | weed/mount/page_writer/upload_pipeline.go | 40 | ||||
| -rw-r--r-- | weed/mount/page_writer/upload_pipeline_test.go | 4 | ||||
| -rw-r--r-- | weed/mount/weedfs_attr.go | 20 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_copy_range.go | 12 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_lseek.go | 43 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_read.go | 28 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_sync.go | 15 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_write.go | 15 | ||||
| -rw-r--r-- | weed/mount/weedfs_write.go | 4 |
21 files changed, 413 insertions, 314 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index 78e7b7877..56c97549f 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -7,7 +7,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "io" "sync" - "time" ) type ChunkedDirtyPages struct { @@ -38,11 +37,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { return dirtyPages } -func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) { +func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) { pages.hasWrites = true glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data))) - pages.uploadPipeline.SaveDataAt(data, offset, isSequential) + pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs) return } @@ -58,28 +57,27 @@ func (pages *ChunkedDirtyPages) FlushData() error { return nil } -func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { +func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) { if !pages.hasWrites { return } - return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) + return pages.uploadPipeline.MaybeReadDataAt(data, startOffset, tsNs) } -func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { +func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) { - mtime := time.Now().UnixNano() defer cleanupFn() fileFullPath := pages.fh.FullPath() fileName := fileFullPath.Name() - chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset) + chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs) if err != nil { glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err) pages.lastErr = err return } - chunk.ModifiedTsNs = mtime pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryChunkGroup.AddChunk(chunk) glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size) } diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index b6ec3d2da..67298b047 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -5,50 +5,60 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "golang.org/x/exp/slices" - "golang.org/x/sync/semaphore" - "math" + "os" "sync" ) type FileHandleId uint64 +var IsDebugFileReadWrite = false + type FileHandle struct { - fh FileHandleId - counter int64 - entry *LockedEntry - entryLock sync.Mutex - inode uint64 - wfs *WFS + fh FileHandleId + counter int64 + entry *LockedEntry + entryLock sync.Mutex + entryChunkGroup *filer.ChunkGroup + inode uint64 + wfs *WFS // cache file has been written to - dirtyMetadata bool - dirtyPages *PageWriter - entryViewCache []filer.VisibleInterval - reader *filer.ChunkReadAt - contentType string - handle uint64 - orderedMutex *semaphore.Weighted + dirtyMetadata bool + dirtyPages *PageWriter + reader *filer.ChunkReadAt + contentType string + handle uint64 + sync.Mutex isDeleted bool + + // for debugging + mirrorFile *os.File } func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle { fh := &FileHandle{ - fh: handleId, - counter: 1, - inode: inode, - wfs: wfs, - orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)), + fh: handleId, + counter: 1, + inode: inode, + wfs: wfs, } // dirtyPages: newContinuousDirtyPages(file, writeOnly), fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit) - if entry != nil { - entry.Attributes.FileSize = filer.FileSize(entry) - } fh.entry = &LockedEntry{ Entry: entry, } + if entry != nil { + fh.SetEntry(entry) + } + + if IsDebugFileReadWrite { + var err error + fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + println("failed to create mirror:", err.Error()) + } + } return fh } @@ -63,6 +73,17 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry { } func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { + if entry != nil { + fileSize := filer.FileSize(entry) + entry.Attributes.FileSize = fileSize + var resolveManifestErr error + fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks) + if resolveManifestErr != nil { + glog.Warningf("failed to resolve manifest chunks in %+v", entry) + } + } else { + glog.Fatalf("setting file handle entry to nil") + } fh.entry.SetEntry(entry) } @@ -78,43 +99,17 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { return } - // find the earliest incoming chunk - newChunks := chunks - earliestChunk := newChunks[0] - for i := 1; i < len(newChunks); i++ { - if lessThan(earliestChunk, newChunks[i]) { - earliestChunk = newChunks[i] - } - } - - // pick out-of-order chunks from existing chunks - for _, chunk := range fh.entry.GetChunks() { - if lessThan(earliestChunk, chunk) { - chunks = append(chunks, chunk) - } - } - - // sort incoming chunks - slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool { - return lessThan(a, b) - }) - - glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks)) - - fh.entry.AppendChunks(newChunks) - fh.entryViewCache = nil + fh.entry.AppendChunks(chunks) } -func (fh *FileHandle) CloseReader() { - if fh.reader != nil { - _ = fh.reader.Close() - fh.reader = nil - } -} +func (fh *FileHandle) ReleaseHandle() { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() -func (fh *FileHandle) Release() { fh.dirtyPages.Destroy() - fh.CloseReader() + if IsDebugFileReadWrite { + fh.mirrorFile.Close() + } } func lessThan(a, b *filer_pb.FileChunk) bool { diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go index cc5885ffc..f0051f061 100644 --- a/weed/mount/filehandle_map.go +++ b/weed/mount/filehandle_map.go @@ -65,7 +65,7 @@ func (i *FileHandleToInode) ReleaseByInode(inode uint64) { if fh.counter <= 0 { delete(i.inode2fh, inode) delete(i.fh2inode, fh.fh) - fh.Release() + fh.ReleaseHandle() } } } @@ -82,7 +82,7 @@ func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) { if fhHandle.counter <= 0 { delete(i.inode2fh, inode) delete(i.fh2inode, fhHandle.fh) - fhHandle.Release() + fhHandle.ReleaseHandle() } } diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index a316a16cd..be6d5d984 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -17,18 +17,20 @@ func (fh *FileHandle) unlockForRead(startOffset int64, size int) { fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) } -func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { - maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs int64) (maxStop int64) { + maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset, tsNs) return } -func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { +func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() fileFullPath := fh.FullPath() entry := fh.GetEntry() if entry == nil { - return 0, io.EOF + return 0, 0, io.EOF } if entry.IsInRemoteOnly() { @@ -36,43 +38,28 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { newEntry, err := fh.downloadRemoteEntry(entry) if err != nil { glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err) - return 0, err + return 0, 0, err } entry = newEntry } - fileSize := int64(filer.FileSize(entry)) + fileSize := int64(entry.Attributes.FileSize) + if fileSize == 0 { + fileSize = int64(filer.FileSize(entry)) + } if fileSize == 0 { glog.V(1).Infof("empty fh %v", fileFullPath) - return 0, io.EOF + return 0, 0, io.EOF } if offset+int64(len(buff)) <= int64(len(entry.Content)) { totalRead := copy(buff, entry.Content[offset:]) glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) - return int64(totalRead), nil - } - - var chunkResolveErr error - if fh.entryViewCache == nil { - fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize) - if chunkResolveErr != nil { - return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) - } - fh.CloseReader() - } - - if fh.reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize) - glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews)) - for _, chunkView := range chunkViews { - glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId) - } - fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize) + return int64(totalRead), 0, nil } - totalRead, err := fh.reader.ReadAt(buff, offset) + totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset) if err != nil && err != io.EOF { glog.Errorf("file handle read %s: %v", fileFullPath, err) @@ -80,7 +67,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) - return int64(totalRead), err + return int64(totalRead), ts, err } func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index 1f31b5300..c9470c440 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -29,35 +29,35 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { return pw } -func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) { +func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) { glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - pw.addToOneChunk(i, offset, data[:writeSize], isSequential) + pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs) offset += writeSize data = data[writeSize:] } } -func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) { - pw.randomWriter.AddPage(offset, data, isSequential) +func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) { + pw.randomWriter.AddPage(offset, data, isSequential, tsNs) } func (pw *PageWriter) FlushData() error { return pw.randomWriter.FlushData() } -func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { +func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64, tsNs int64) (maxStop int64) { glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) + maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset, tsNs) offset += readSize data = data[readSize:] diff --git a/weed/mount/page_writer/activity_score.go b/weed/mount/page_writer/activity_score.go new file mode 100644 index 000000000..22da87e37 --- /dev/null +++ b/weed/mount/page_writer/activity_score.go @@ -0,0 +1,39 @@ +package page_writer + +import "time" + +type ActivityScore struct { + lastActiveTsNs int64 + decayedActivenessScore int64 +} + +func NewActivityScore() *ActivityScore { + return &ActivityScore{} +} + +func (as ActivityScore) MarkRead() { + now := time.Now().UnixNano() + deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds + as.lastActiveTsNs = now + + as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 256 + if as.decayedActivenessScore < 0 { + as.decayedActivenessScore = 0 + } +} + +func (as ActivityScore) MarkWrite() { + now := time.Now().UnixNano() + deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds + as.lastActiveTsNs = now + + as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 1024 + if as.decayedActivenessScore < 0 { + as.decayedActivenessScore = 0 + } +} + +func (as ActivityScore) ActivityScore() int64 { + deltaTime := (time.Now().UnixNano() - as.lastActiveTsNs) >> 30 // about number of seconds + return as.decayedActivenessScore >> deltaTime +} diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go index a9d64c8e4..005385c1a 100644 --- a/weed/mount/page_writer/chunk_interval_list.go +++ b/weed/mount/page_writer/chunk_interval_list.go @@ -8,6 +8,7 @@ import ( type ChunkWrittenInterval struct { StartOffset int64 stopOffset int64 + TsNs int64 prev *ChunkWrittenInterval next *ChunkWrittenInterval } @@ -42,10 +43,14 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList { return list } -func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) { +func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset, tsNs int64) { + if startOffset >= stopOffset { + return + } interval := &ChunkWrittenInterval{ StartOffset: startOffset, stopOffset: stopOffset, + TsNs: tsNs, } list.addInterval(interval) } @@ -62,50 +67,54 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { + //t := list.head + //for ; t.next != nil; t = t.next { + // if t.TsNs > interval.TsNs { + // println("writes is out of order", t.TsNs-interval.TsNs, "ns") + // } + //} + p := list.head - for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next { + for ; p.next != nil && p.next.stopOffset <= interval.StartOffset; p = p.next { } q := list.tail - for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev { + for ; q.prev != nil && q.prev.StartOffset >= interval.stopOffset; q = q.prev { } - if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { - // merge p and q together - p.stopOffset = q.stopOffset - unlinkNodesBetween(p, q.next) - return + // left side + // interval after p.next start + if p.next.StartOffset < interval.StartOffset { + t := &ChunkWrittenInterval{ + StartOffset: p.next.StartOffset, + stopOffset: interval.StartOffset, + TsNs: p.next.TsNs, + } + p.next = t + t.prev = p + t.next = interval + interval.prev = t + } else { + p.next = interval + interval.prev = p } - if interval.StartOffset <= p.stopOffset { - // merge new interval into p - p.stopOffset = interval.stopOffset - unlinkNodesBetween(p, q) - return - } - if q.StartOffset <= interval.stopOffset { - // merge new interval into q - q.StartOffset = interval.StartOffset - unlinkNodesBetween(p, q) - return - } - - // add the new interval between p and q - unlinkNodesBetween(p, q) - p.next = interval - interval.prev = p - q.prev = interval - interval.next = q - -} -// unlinkNodesBetween remove all nodes after start and before stop, exclusive -func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) { - if start.next == stop { - return + // right side + // interval ends before p.prev + if interval.stopOffset < q.prev.stopOffset { + t := &ChunkWrittenInterval{ + StartOffset: interval.stopOffset, + stopOffset: q.prev.stopOffset, + TsNs: q.prev.TsNs, + } + q.prev = t + t.next = q + interval.next = t + t.prev = interval + } else { + q.prev = interval + interval.next = q } - start.next.prev = nil - start.next = stop - stop.prev.next = nil - stop.prev = start + } func (list *ChunkWrittenIntervalList) size() int { diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go index b22f5eb5d..eb1d5ff46 100644 --- a/weed/mount/page_writer/chunk_interval_list_test.go +++ b/weed/mount/page_writer/chunk_interval_list_test.go @@ -10,40 +10,72 @@ func Test_PageChunkWrittenIntervalList(t *testing.T) { assert.Equal(t, 0, list.size(), "empty list") - list.MarkWritten(0, 5) + list.MarkWritten(0, 5, 1) assert.Equal(t, 1, list.size(), "one interval") - list.MarkWritten(0, 5) + list.MarkWritten(0, 5, 2) assert.Equal(t, 1, list.size(), "duplicated interval2") - list.MarkWritten(95, 100) + list.MarkWritten(95, 100, 3) assert.Equal(t, 2, list.size(), "two intervals") - list.MarkWritten(50, 60) + list.MarkWritten(50, 60, 4) assert.Equal(t, 3, list.size(), "three intervals") - list.MarkWritten(50, 55) - assert.Equal(t, 3, list.size(), "three intervals merge") + list.MarkWritten(50, 55, 5) + assert.Equal(t, 4, list.size(), "three intervals merge") - list.MarkWritten(40, 50) - assert.Equal(t, 3, list.size(), "three intervals grow forward") + list.MarkWritten(40, 50, 6) + assert.Equal(t, 5, list.size(), "three intervals grow forward") - list.MarkWritten(50, 65) - assert.Equal(t, 3, list.size(), "three intervals grow backward") + list.MarkWritten(50, 65, 7) + assert.Equal(t, 4, list.size(), "three intervals grow backward") - list.MarkWritten(70, 80) - assert.Equal(t, 4, list.size(), "four intervals") + list.MarkWritten(70, 80, 8) + assert.Equal(t, 5, list.size(), "four intervals") - list.MarkWritten(60, 70) - assert.Equal(t, 3, list.size(), "three intervals merged") + list.MarkWritten(60, 70, 9) + assert.Equal(t, 6, list.size(), "three intervals merged") - list.MarkWritten(59, 71) - assert.Equal(t, 3, list.size(), "covered three intervals") + list.MarkWritten(59, 71, 10) + assert.Equal(t, 6, list.size(), "covered three intervals") - list.MarkWritten(5, 59) - assert.Equal(t, 2, list.size(), "covered two intervals") + list.MarkWritten(5, 59, 11) + assert.Equal(t, 5, list.size(), "covered two intervals") - list.MarkWritten(70, 99) - assert.Equal(t, 1, list.size(), "covered one intervals") + list.MarkWritten(70, 99, 12) + assert.Equal(t, 5, list.size(), "covered one intervals") } + +type interval struct { + start int64 + stop int64 + expected bool +} + +func Test_PageChunkWrittenIntervalList1(t *testing.T) { + list := newChunkWrittenIntervalList() + inputs := []interval{ + {1, 5, true}, + {2, 3, true}, + } + for i, input := range inputs { + list.MarkWritten(input.start, input.stop, int64(i)+1) + actual := hasData(list, 0, 4) + if actual != input.expected { + t.Errorf("input [%d,%d) expected %v actual %v", input.start, input.stop, input.expected, actual) + } + } +} + +func hasData(usage *ChunkWrittenIntervalList, chunkStartOffset, x int64) bool { + for t := usage.head.next; t != usage.tail; t = t.next { + logicStart := chunkStartOffset + t.StartOffset + logicStop := chunkStartOffset + t.stopOffset + if logicStart <= x && x < logicStop { + return true + } + } + return false +} diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go index 44f879afc..7cddcf69e 100644 --- a/weed/mount/page_writer/dirty_pages.go +++ b/weed/mount/page_writer/dirty_pages.go @@ -1,9 +1,9 @@ package page_writer type DirtyPages interface { - AddPage(offset int64, data []byte, isSequential bool) + AddPage(offset int64, data []byte, isSequential bool, tsNs int64) FlushData() error - ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) + ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) Destroy() LockForRead(startOffset, stopOffset int64) UnlockForRead(startOffset, stopOffset int64) diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go index 4e8f31425..32d246deb 100644 --- a/weed/mount/page_writer/page_chunk.go +++ b/weed/mount/page_writer/page_chunk.go @@ -4,13 +4,13 @@ import ( "io" ) -type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) +type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) type PageChunk interface { FreeResource() - WriteDataAt(src []byte, offset int64) (n int) - ReadDataAt(p []byte, off int64) (maxStop int64) + WriteDataAt(src []byte, offset int64, tsNs int64) (n int) + ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) IsComplete() bool - WrittenSize() int64 + ActivityScore() int64 SaveContent(saveFn SaveToStorageFunc) } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index 8cccded67..1ec8cecb4 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -19,6 +19,7 @@ type MemChunk struct { usage *ChunkWrittenIntervalList chunkSize int64 logicChunkIndex LogicChunkIndex + activityScore *ActivityScore } func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { @@ -28,6 +29,7 @@ func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { chunkSize: chunkSize, buf: mem.Allocate(int(chunkSize)), usage: newChunkWrittenIntervalList(), + activityScore: NewActivityScore(), } } @@ -39,29 +41,37 @@ func (mc *MemChunk) FreeResource() { mem.Free(mc.buf) } -func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { +func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) { mc.Lock() defer mc.Unlock() innerOffset := offset % mc.chunkSize n = copy(mc.buf[innerOffset:], src) - mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) + mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs) + mc.activityScore.MarkWrite() + return } -func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { +func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { mc.RLock() defer mc.RUnlock() memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { - logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) + logicStart := max(off, memChunkBaseOffset+t.StartOffset) logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) + if t.TsNs >= tsNs { + copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) + maxStop = max(maxStop, logicStop) + } else { + println("read old data1", tsNs-t.TsNs, "ns") + } } } + mc.activityScore.MarkRead() + return } @@ -72,11 +82,8 @@ func (mc *MemChunk) IsComplete() bool { return mc.usage.IsComplete(mc.chunkSize) } -func (mc *MemChunk) WrittenSize() int64 { - mc.RLock() - defer mc.RUnlock() - - return mc.usage.WrittenSize() +func (mc *MemChunk) ActivityScore() int64 { + return mc.activityScore.ActivityScore() } func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { @@ -88,7 +95,7 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { } for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset]) - saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() { + saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() { }) } } diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index bf2cdb256..6cedc64df 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -15,12 +15,12 @@ var ( type ActualChunkIndex int type SwapFile struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - logicToActualChunkIndexLock sync.Mutex - chunkSize int64 - freeActualChunkList []ActualChunkIndex + dir string + file *os.File + chunkSize int64 + chunkTrackingLock sync.Mutex + activeChunkCount int + freeActualChunkList []ActualChunkIndex } type SwapFileChunk struct { @@ -29,14 +29,15 @@ type SwapFileChunk struct { usage *ChunkWrittenIntervalList logicChunkIndex LogicChunkIndex actualChunkIndex ActualChunkIndex + activityScore *ActivityScore + //memChunk *MemChunk } func NewSwapFile(dir string, chunkSize int64) *SwapFile { return &SwapFile{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - chunkSize: chunkSize, + dir: dir, + file: nil, + chunkSize: chunkSize, } } func (sf *SwapFile) FreeResource() { @@ -46,7 +47,7 @@ func (sf *SwapFile) FreeResource() { } } -func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { +func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { if sf.file == nil { var err error sf.file, err = os.CreateTemp(sf.dir, "") @@ -55,70 +56,98 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF return nil } } - sf.logicToActualChunkIndexLock.Lock() - defer sf.logicToActualChunkIndexLock.Unlock() - actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] - if !found { - if len(sf.freeActualChunkList) > 0 { - actualChunkIndex = sf.freeActualChunkList[0] - sf.freeActualChunkList = sf.freeActualChunkList[1:] - } else { - actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) - } - sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex + sf.chunkTrackingLock.Lock() + defer sf.chunkTrackingLock.Unlock() + + sf.activeChunkCount++ + + // assign a new physical chunk + var actualChunkIndex ActualChunkIndex + if len(sf.freeActualChunkList) > 0 { + actualChunkIndex = sf.freeActualChunkList[0] + sf.freeActualChunkList = sf.freeActualChunkList[1:] + } else { + actualChunkIndex = ActualChunkIndex(sf.activeChunkCount) } - return &SwapFileChunk{ + swapFileChunk := &SwapFileChunk{ swapfile: sf, usage: newChunkWrittenIntervalList(), logicChunkIndex: logicChunkIndex, actualChunkIndex: actualChunkIndex, + activityScore: NewActivityScore(), + // memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize), } + + // println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf) + return swapFileChunk } func (sc *SwapFileChunk) FreeResource() { - sc.swapfile.logicToActualChunkIndexLock.Lock() - defer sc.swapfile.logicToActualChunkIndexLock.Unlock() sc.Lock() defer sc.Unlock() + sc.swapfile.chunkTrackingLock.Lock() + defer sc.swapfile.chunkTrackingLock.Unlock() + sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex) - delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex) + sc.swapfile.activeChunkCount-- + // println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile) } -func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { +func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) { sc.Lock() defer sc.Unlock() + // println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex) + innerOffset := offset % sc.swapfile.chunkSize var err error n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) - if err == nil { - sc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) - } else { + sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs) + if err != nil { glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err) } + //sc.memChunk.WriteDataAt(src, offset, tsNs) + sc.activityScore.MarkWrite() + return } -func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { +func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { sc.RLock() defer sc.RUnlock() + // println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex) + + //memCopy := make([]byte, len(p)) + //copy(memCopy, p) + chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { logicStart := max(off, chunkStartOffset+t.StartOffset) logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) if logicStart < logicStop { - actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize - if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { - glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) - break + if t.TsNs >= tsNs { + actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize + if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) + break + } + maxStop = max(maxStop, logicStop) + } else { + println("read old data2", tsNs-t.TsNs, "ns") } - maxStop = max(maxStop, logicStop) } } + //sc.memChunk.ReadDataAt(memCopy, off, tsNs) + //if bytes.Compare(memCopy, p) != 0 { + // println("read wrong data from swap file", off, sc.logicChunkIndex) + //} + + sc.activityScore.MarkRead() + return } @@ -128,27 +157,27 @@ func (sc *SwapFileChunk) IsComplete() bool { return sc.usage.IsComplete(sc.swapfile.chunkSize) } -func (sc *SwapFileChunk) WrittenSize() int64 { - sc.RLock() - defer sc.RUnlock() - return sc.usage.WrittenSize() +func (sc *SwapFileChunk) ActivityScore() int64 { + return sc.activityScore.ActivityScore() } func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { + sc.RLock() + defer sc.RUnlock() + if saveFn == nil { return } - sc.Lock() - defer sc.Unlock() - + // println(sc.logicChunkIndex, "|", "save") for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { data := mem.Allocate(int(t.Size())) - sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) - reader := util.NewBytesReader(data) - saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() { - }) + n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) + if n > 0 { + reader := util.NewBytesReader(data[:n]) + saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() { + }) + } mem.Free(data) } - sc.usage = newChunkWrittenIntervalList() } diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 252dddc06..6065f2f76 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" + "math" "sync" "sync/atomic" ) @@ -55,7 +56,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, return t } -func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) { +func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) { + up.chunksLock.Lock() defer up.chunksLock.Unlock() @@ -65,33 +67,39 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n if !found { if len(up.writableChunks) > up.writableChunkLimit { // if current file chunks is over the per file buffer count limit - fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) - for lci, mc := range up.writableChunks { - chunkFullness := mc.WrittenSize() - if fullness < chunkFullness { - fullestChunkIndex = lci - fullness = chunkFullness + laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64) + for wci, wc := range up.writableChunks { + activityScore := wc.ActivityScore() + if lowestActivityScore > activityScore { + laziestChunkIndex = wci + lowestActivityScore = activityScore } } - up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) - // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) + up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex) + // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs) } if isSequential && len(up.writableChunks) < up.writableChunkLimit && atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { - pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) + pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex) } up.writableChunks[logicChunkIndex] = pageChunk } - n = pageChunk.WriteDataAt(p, off) + //if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed { + // println("found already sealed chunk", logicChunkIndex) + //} + //if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading { + // println("found active read chunk", logicChunkIndex) + //} + n = pageChunk.WriteDataAt(p, off, tsNs) up.maybeMoveToSealed(pageChunk, logicChunkIndex) return } -func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { +func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) up.chunksLock.Lock() @@ -103,12 +111,8 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { // read from sealed chunks first sealedChunk, found := up.sealedChunks[logicChunkIndex] if found { - sealedChunk.referenceCounter++ - } - if found { - maxStop = sealedChunk.chunk.ReadDataAt(p, off) + maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs) glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) - sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) } // read from writable chunks last @@ -116,7 +120,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { if !found { return } - writableMaxStop := writableChunk.ReadDataAt(p, off) + writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs) glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) maxStop = max(maxStop, writableMaxStop) diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go index 27da7036d..2d803f6af 100644 --- a/weed/mount/page_writer/upload_pipeline_test.go +++ b/weed/mount/page_writer/upload_pipeline_test.go @@ -31,14 +31,14 @@ func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { p := make([]byte, 4) for i := startOff / 4; i < stopOff/4; i += 4 { util.Uint32toBytes(p, uint32(i)) - uploadPipeline.SaveDataAt(p, i, false) + uploadPipeline.SaveDataAt(p, i, false, 0) } } func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) { p := make([]byte, 4) for i := startOff; i < stopOff/4; i += 4 { - uploadPipeline.MaybeReadDataAt(p, i) + uploadPipeline.MaybeReadDataAt(p, i, 0) x := util.BytesToUint32(p) if x != uint32(i) { t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4) diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index 1d58e0852..7dc3c6b50 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -20,12 +20,12 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse _, _, entry, status := wfs.maybeReadEntry(inode) if status == fuse.OK { out.AttrValid = 1 - wfs.setAttrByPbEntry(&out.Attr, inode, entry) + wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) return status } else { if fh, found := wfs.fhmap.FindFileHandle(inode); found { out.AttrValid = 1 - wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry()) + wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true) out.Nlink = 0 return fuse.OK } @@ -75,7 +75,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse // set the new chunks and reset entry cache entry.Chunks = chunks if fh != nil { - fh.entryViewCache = nil + fh.entryChunkGroup.SetChunks(chunks) } } entry.Attributes.Mtime = time.Now().Unix() @@ -114,7 +114,11 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse } out.AttrValid = 1 - wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry) + size, includeSize := input.GetSize() + if includeSize { + out.Attr.Size = size + } + wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry, !includeSize) if fh != nil { fh.dirtyMetadata = true @@ -139,12 +143,14 @@ func (wfs *WFS) setRootAttr(out *fuse.AttrOut) { out.Nlink = 1 } -func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry) { +func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry, calculateSize bool) { out.Ino = inode if entry.Attributes != nil && entry.Attributes.Inode != 0 { out.Ino = entry.Attributes.Inode } - out.Size = filer.FileSize(entry) + if calculateSize { + out.Size = filer.FileSize(entry) + } if entry.FileMode()&os.ModeSymlink != 0 { out.Size = uint64(len(entry.Attributes.SymlinkTarget)) } @@ -194,7 +200,7 @@ func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb. out.Generation = 1 out.EntryValid = 1 out.AttrValid = 1 - wfs.setAttrByPbEntry(&out.Attr, inode, entry) + wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) } func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) { diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index bc092a252..e3f841b02 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -1,8 +1,8 @@ package mount import ( - "context" "net/http" + "time" "github.com/hanwen/go-fuse/v2/fuse" @@ -44,8 +44,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) } // lock source and target file handles - fhOut.orderedMutex.Acquire(context.Background(), 1) - defer fhOut.orderedMutex.Release(1) + fhOut.Lock() + defer fhOut.Unlock() fhOut.entryLock.Lock() defer fhOut.entryLock.Unlock() @@ -54,8 +54,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) } if fhIn.fh != fhOut.fh { - fhIn.orderedMutex.Acquire(context.Background(), 1) - defer fhIn.orderedMutex.Release(1) + fhIn.Lock() + defer fhIn.Unlock() fhIn.entryLock.Lock() defer fhIn.entryLock.Unlock() } @@ -88,7 +88,7 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) // put data at the specified offset in target file fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len)) fhOut.entry.Content = nil - fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode()) + fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano()) fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize))) fhOut.dirtyMetadata = true written = uint32(totalRead) diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go index 9d6402f96..93fc65247 100644 --- a/weed/mount/weedfs_file_lseek.go +++ b/weed/mount/weedfs_file_lseek.go @@ -1,7 +1,6 @@ package mount import ( - "context" "syscall" "github.com/hanwen/go-fuse/v2/fuse" @@ -36,8 +35,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO } // lock the file until the proper offset was calculated - fh.orderedMutex.Acquire(context.Background(), 1) - defer fh.orderedMutex.Release(1) + fh.Lock() + defer fh.Unlock() fh.entryLock.Lock() defer fh.entryLock.Unlock() @@ -56,17 +55,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO return ENXIO } - // refresh view cache if necessary - if fh.entryViewCache == nil { - var err error - fh.entryViewCache, err = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), fh.entry.GetChunks(), 0, fileSize) - if err != nil { - return fuse.EIO - } - } - // search chunks for the offset - found, offset := searchChunks(fh, offset, fileSize, in.Whence) + found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence) if found { out.Offset = uint64(offset) return fuse.OK @@ -82,30 +72,3 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO return fuse.OK } - -// searchChunks goes through all chunks to find the correct offset -func searchChunks(fh *FileHandle, offset, fileSize int64, whence uint32) (found bool, out int64) { - chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, offset, fileSize) - - for _, chunkView := range chunkViews { - if offset < chunkView.LogicOffset { - if whence == SEEK_HOLE { - out = offset - } else { - out = chunkView.LogicOffset - } - - return true, out - } - - if offset >= chunkView.LogicOffset && offset < chunkView.Offset+int64(chunkView.Size) && whence == SEEK_DATA { - out = offset - - return true, out - } - - offset += int64(chunkView.Size) - } - - return -} diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go index 8375f9a5d..cedece137 100644 --- a/weed/mount/weedfs_file_read.go +++ b/weed/mount/weedfs_file_read.go @@ -1,7 +1,8 @@ package mount import ( - "context" + "bytes" + "fmt" "io" "github.com/hanwen/go-fuse/v2/fuse" @@ -40,8 +41,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse return nil, fuse.ENOENT } - fh.orderedMutex.Acquire(context.Background(), 1) - defer fh.orderedMutex.Release(1) + fh.Lock() + defer fh.Unlock() offset := int64(in.Offset) totalRead, err := readDataByFileHandle(buff, fh, offset) @@ -50,6 +51,23 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse return nil, fuse.EIO } + if IsDebugFileReadWrite { + // print(".") + mirrorData := make([]byte, totalRead) + fh.mirrorFile.ReadAt(mirrorData, offset) + if bytes.Compare(mirrorData, buff[:totalRead]) != 0 { + + againBuff := make([]byte, len(buff)) + againRead, _ := readDataByFileHandle(buff, fh, offset) + againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0 + againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0 + + fmt.Printf("\ncompare %v [%d,%d) size:%d againSame:%v againCorrect:%v\n", fh.mirrorFile.Name(), offset, offset+totalRead, totalRead, againSame, againCorrect) + //fmt.Printf("read mirrow data: %v\n", mirrorData) + //fmt.Printf("read actual data: %v\n", buff[:totalRead]) + } + } + return fuse.ReadResultData(buff[:totalRead]), fuse.OK } @@ -59,9 +77,9 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e fhIn.lockForRead(offset, size) defer fhIn.unlockForRead(offset, size) - n, err := fhIn.readFromChunks(buff, offset) + n, tsNs, err := fhIn.readFromChunks(buff, offset) if err == nil || err == io.EOF { - maxStop := fhIn.readFromDirtyPages(buff, offset) + maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs) n = max(maxStop-offset, n) } if err == io.EOF { diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 7b7c66680..ac18e05ea 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -89,8 +89,8 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu } func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { - fh.orderedMutex.Acquire(context.Background(), 1) - defer fh.orderedMutex.Release(1) + fh.Lock() + defer fh.Unlock() // flush works at fh level fileFullPath := fh.FullPath() @@ -145,9 +145,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { } glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.GetChunks())) - for i, chunk := range entry.GetChunks() { - glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) - } + //for i, chunk := range entry.GetChunks() { + // glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) + //} manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks()) @@ -158,6 +158,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { glog.V(0).Infof("MaybeManifestize: %v", manifestErr) } entry.Chunks = append(chunks, manifestChunks...) + fh.entryChunkGroup.SetChunks(entry.Chunks) wfs.mapPbIdFromLocalToFiler(request.Entry) defer wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -181,5 +182,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { return fuse.EIO } + if IsDebugFileReadWrite { + fh.mirrorFile.Sync() + } + return fuse.OK } diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 7b13d54ff..5a9a21ded 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -1,10 +1,10 @@ package mount import ( - "context" "github.com/hanwen/go-fuse/v2/fuse" "net/http" "syscall" + "time" ) /** @@ -46,8 +46,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size)) - fh.orderedMutex.Acquire(context.Background(), 1) - defer fh.orderedMutex.Release(1) + tsNs := time.Now().UnixNano() + + fh.Lock() + defer fh.Unlock() entry := fh.GetEntry() if entry == nil { @@ -59,7 +61,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode()) + fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs) written = uint32(len(data)) @@ -70,5 +72,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr fh.dirtyMetadata = true + if IsDebugFileReadWrite { + // print("+") + fh.mirrorFile.WriteAt(data, offset) + } + return written, fuse.OK } diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go index e18a4a358..4c8470245 100644 --- a/weed/mount/weedfs_write.go +++ b/weed/mount/weedfs_write.go @@ -13,7 +13,7 @@ import ( func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { - return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, err error) { + return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { fileId, uploadResult, err, data := operation.UploadWithRetry( wfs, @@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun wfs.chunkCache.SetChunk(fileId, data) } - chunk = uploadResult.ToPbFileChunk(fileId, offset) + chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs) return chunk, nil } } |
