aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2023-01-02 23:20:45 -0800
committerGitHub <noreply@github.com>2023-01-02 23:20:45 -0800
commitd4566d4aaa426b33015780c7cc18f887fc07cca4 (patch)
tree7c3b5cb3d9e54297b9d4213b67408f86149013f7
parent367353b936c450906e88e850c7d1e804f97c3560 (diff)
downloadseaweedfs-d4566d4aaa426b33015780c7cc18f887fc07cca4.tar.xz
seaweedfs-d4566d4aaa426b33015780c7cc18f887fc07cca4.zip
more solid weed mount (#4089)
* compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
-rw-r--r--weed/command/filer_copy.go8
-rw-r--r--weed/filer/filechunk_group.go148
-rw-r--r--weed/filer/filechunk_group_test.go36
-rw-r--r--weed/filer/filechunk_manifest.go4
-rw-r--r--weed/filer/filechunk_section.go119
-rw-r--r--weed/filer/filechunks.go248
-rw-r--r--weed/filer/filechunks_read.go106
-rw-r--r--weed/filer/filechunks_read_test.go86
-rw-r--r--weed/filer/filechunks_test.go214
-rw-r--r--weed/filer/filer_notify_append.go2
-rw-r--r--weed/filer/interval_list.go259
-rw-r--r--weed/filer/interval_list_test.go327
-rw-r--r--weed/filer/reader_at.go65
-rw-r--r--weed/filer/reader_at_test.go142
-rw-r--r--weed/filer/reader_cache.go7
-rw-r--r--weed/filer/stream.go90
-rw-r--r--weed/mount/dirty_pages_chunked.go16
-rw-r--r--weed/mount/filehandle.go109
-rw-r--r--weed/mount/filehandle_map.go4
-rw-r--r--weed/mount/filehandle_read.go43
-rw-r--r--weed/mount/page_writer.go12
-rw-r--r--weed/mount/page_writer/activity_score.go39
-rw-r--r--weed/mount/page_writer/chunk_interval_list.go83
-rw-r--r--weed/mount/page_writer/chunk_interval_list_test.go72
-rw-r--r--weed/mount/page_writer/dirty_pages.go4
-rw-r--r--weed/mount/page_writer/page_chunk.go8
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go31
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go125
-rw-r--r--weed/mount/page_writer/upload_pipeline.go40
-rw-r--r--weed/mount/page_writer/upload_pipeline_test.go4
-rw-r--r--weed/mount/weedfs_attr.go20
-rw-r--r--weed/mount/weedfs_file_copy_range.go12
-rw-r--r--weed/mount/weedfs_file_lseek.go43
-rw-r--r--weed/mount/weedfs_file_read.go28
-rw-r--r--weed/mount/weedfs_file_sync.go15
-rw-r--r--weed/mount/weedfs_file_write.go15
-rw-r--r--weed/mount/weedfs_write.go4
-rw-r--r--weed/operation/upload_content.go4
-rw-r--r--weed/replication/repl_util/replication_util.go7
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go4
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go2
-rw-r--r--weed/server/filer_server_handlers_write_upload.go2
-rw-r--r--weed/server/webdav_server.go30
-rw-r--r--weed/shell/command_fs_verify.go2
-rw-r--r--weed/shell/command_volume_fsck.go2
45 files changed, 1835 insertions, 806 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 667b089ed..0c4626317 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -365,7 +365,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if flushErr != nil {
return flushErr
}
- chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0))
+ chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano()))
}
if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -450,7 +450,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error)
return
}
- chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize)
+ chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano())
fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
@@ -530,7 +530,7 @@ func detectMimeType(f *os.File) string {
return mimeType
}
-func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
+func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
worker,
@@ -561,7 +561,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
if uploadResult.Error != "" {
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
- return uploadResult.ToPbFileChunk(finalFileId, offset), nil
+ return uploadResult.ToPbFileChunk(finalFileId, offset, tsNs), nil
}
var _ = filer_pb.FilerClient(&FileCopyWorker{})
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go
new file mode 100644
index 000000000..5dbf16a5c
--- /dev/null
+++ b/weed/filer/filechunk_group.go
@@ -0,0 +1,148 @@
+package filer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
+ "sync"
+)
+
+type ChunkGroup struct {
+ lookupFn wdclient.LookupFileIdFunctionType
+ chunkCache chunk_cache.ChunkCache
+ manifestChunks []*filer_pb.FileChunk
+ sections map[SectionIndex]*FileChunkSection
+ sectionsLock sync.RWMutex
+}
+
+func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
+ group := &ChunkGroup{
+ lookupFn: lookupFn,
+ chunkCache: chunkCache,
+ sections: make(map[SectionIndex]*FileChunkSection),
+ }
+
+ err := group.SetChunks(chunks)
+ return group, err
+}
+
+func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error {
+
+ group.sectionsLock.Lock()
+ defer group.sectionsLock.Unlock()
+
+ sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize)
+ for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
+ section, found := group.sections[si]
+ if !found {
+ section = NewFileChunkSection(si)
+ group.sections[si] = section
+ }
+ section.addChunk(chunk)
+ }
+ return nil
+}
+
+func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
+
+ group.sectionsLock.RLock()
+ defer group.sectionsLock.RUnlock()
+
+ sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize)
+ for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
+ section, found := group.sections[si]
+ rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
+ if !found {
+ for i := rangeStart; i < rangeStop; i++ {
+ buff[i-offset] = 0
+ }
+ continue
+ }
+ xn, xTsNs, xErr := section.readDataAt(group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart)
+ if xErr != nil {
+ err = xErr
+ }
+ n += xn
+ tsNs = max(tsNs, xTsNs)
+ }
+ return
+}
+
+func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
+ var dataChunks []*filer_pb.FileChunk
+ for _, chunk := range chunks {
+
+ if !chunk.IsChunkManifest {
+ dataChunks = append(dataChunks, chunk)
+ continue
+ }
+
+ resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk)
+ if err != nil {
+ return err
+ }
+
+ group.manifestChunks = append(group.manifestChunks, chunk)
+ dataChunks = append(dataChunks, resolvedChunks...)
+ }
+
+ for _, chunk := range dataChunks {
+ sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize)
+ for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
+ section, found := group.sections[si]
+ if !found {
+ section = NewFileChunkSection(si)
+ group.sections[si] = section
+ }
+ section.chunks = append(section.chunks, chunk)
+ }
+ }
+ return nil
+}
+
+const (
+ // see weedfs_file_lseek.go
+ SEEK_DATA uint32 = 3 // seek to next data after the offset
+ // SEEK_HOLE uint32 = 4 // seek to next hole after the offset
+)
+
+// FIXME: needa tests
+func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
+ group.sectionsLock.RLock()
+ defer group.sectionsLock.RUnlock()
+
+ return group.doSearchChunks(offset, fileSize, whence)
+}
+
+func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
+
+ sectionIndex, maxSectionIndex := SectionIndex(offset/SectionSize), SectionIndex(fileSize/SectionSize)
+ if whence == SEEK_DATA {
+ for si := sectionIndex; si < maxSectionIndex+1; si++ {
+ section, foundSection := group.sections[si]
+ if !foundSection {
+ continue
+ }
+ sectionStart := section.DataStartOffset(group, offset, fileSize)
+ if sectionStart == -1 {
+ continue
+ }
+ return true, sectionStart
+ }
+ return false, 0
+ } else {
+ // whence == SEEK_HOLE
+ for si := sectionIndex; si < maxSectionIndex; si++ {
+ section, foundSection := group.sections[si]
+ if !foundSection {
+ return true, offset
+ }
+ holeStart := section.NextStopOffset(group, offset, fileSize)
+ if holeStart%SectionSize == 0 {
+ continue
+ }
+ return true, holeStart
+ }
+ return true, fileSize
+ }
+}
diff --git a/weed/filer/filechunk_group_test.go b/weed/filer/filechunk_group_test.go
new file mode 100644
index 000000000..d24d66a49
--- /dev/null
+++ b/weed/filer/filechunk_group_test.go
@@ -0,0 +1,36 @@
+package filer
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestChunkGroup_doSearchChunks(t *testing.T) {
+ type fields struct {
+ sections map[SectionIndex]*FileChunkSection
+ }
+ type args struct {
+ offset int64
+ fileSize int64
+ whence uint32
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ wantFound bool
+ wantOut int64
+ }{
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ group := &ChunkGroup{
+ sections: tt.fields.sections,
+ }
+ gotFound, gotOut := group.doSearchChunks(tt.args.offset, tt.args.fileSize, tt.args.whence)
+ assert.Equalf(t, tt.wantFound, gotFound, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
+ assert.Equalf(t, tt.wantOut, gotOut, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
+ })
+ }
+}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 221a11ffe..d9d0331be 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -264,7 +264,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
}
}
- manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0)
+ manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0)
if err != nil {
return nil, err
}
@@ -275,4 +275,4 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
return
}
-type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error)
+type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error)
diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go
new file mode 100644
index 000000000..60c919569
--- /dev/null
+++ b/weed/filer/filechunk_section.go
@@ -0,0 +1,119 @@
+package filer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "sync"
+)
+
+const SectionSize = 2 * 1024 * 1024 * 128 // 256MiB
+type SectionIndex int64
+type FileChunkSection struct {
+ sectionIndex SectionIndex
+ chunks []*filer_pb.FileChunk
+ visibleIntervals *IntervalList[*VisibleInterval]
+ chunkViews *IntervalList[*ChunkView]
+ reader *ChunkReadAt
+ lock sync.Mutex
+}
+
+func NewFileChunkSection(si SectionIndex) *FileChunkSection {
+ return &FileChunkSection{
+ sectionIndex: si,
+ }
+}
+
+func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
+ section.lock.Lock()
+ defer section.lock.Unlock()
+
+ start, stop := max(int64(section.sectionIndex)*SectionSize, chunk.Offset), min(((int64(section.sectionIndex)+1)*SectionSize), chunk.Offset+int64(chunk.Size))
+
+ section.chunks = append(section.chunks, chunk)
+
+ if section.visibleIntervals != nil {
+ MergeIntoVisibles(section.visibleIntervals, start, stop, chunk)
+ }
+
+ if section.visibleIntervals != nil {
+ section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
+ }
+
+ if section.chunkViews != nil {
+ MergeIntoChunkViews(section.chunkViews, start, stop, chunk)
+ }
+
+ return nil
+}
+
+func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
+ if section.visibleIntervals == nil {
+ section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
+ section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
+ if section.reader != nil {
+ _ = section.reader.Close()
+ section.reader = nil
+ }
+ }
+ if section.chunkViews == nil {
+ section.chunkViews = ViewFromVisibleIntervals(section.visibleIntervals, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
+ }
+
+ if section.reader == nil {
+ section.reader = NewChunkReaderAtFromClient(group.lookupFn, section.chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
+ }
+ section.reader.fileSize = fileSize
+}
+
+func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
+ section.lock.Lock()
+ defer section.lock.Unlock()
+
+ section.setupForRead(group, fileSize)
+
+ return section.reader.ReadAtWithTime(buff, offset)
+}
+
+func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
+ section.lock.Lock()
+ defer section.lock.Unlock()
+
+ section.setupForRead(group, fileSize)
+
+ for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
+ visible := x.Value
+ if visible.stop <= offset {
+ continue
+ }
+ if offset < visible.start {
+ return offset
+ }
+ return offset
+ }
+ return -1
+}
+
+func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
+ section.lock.Lock()
+ defer section.lock.Unlock()
+
+ section.setupForRead(group, fileSize)
+
+ isAfterOffset := false
+ for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
+ visible := x.Value
+ if !isAfterOffset {
+ if visible.stop <= offset {
+ continue
+ }
+ isAfterOffset = true
+ }
+ if offset < visible.start {
+ return offset
+ }
+ // now visible.start <= offset
+ if offset < visible.stop {
+ offset = visible.stop
+ }
+ }
+ return offset
+}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 061e0757a..d872bd22d 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
- "golang.org/x/exp/slices"
"math"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -42,7 +41,7 @@ func ETag(entry *filer_pb.Entry) (etag string) {
}
func ETagEntry(entry *Entry) (etag string) {
- if entry.IsInRemoteOnly() {
+ if entry.IsInRemoteOnly() {
return entry.Remote.RemoteETag
}
if entry.Attr.Md5 == nil {
@@ -66,8 +65,15 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ compacted, garbage = SeparateGarbageChunks(visibles, chunks)
+
+ return
+}
+
+func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk) {
fileIds := make(map[string]bool)
- for _, interval := range visibles {
+ for x := visibles.Front(); x != nil; x = x.Next {
+ interval := x.Value
fileIds[interval.fileId] = true
}
for _, chunk := range chunks {
@@ -77,8 +83,7 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks
garbage = append(garbage, chunk)
}
}
-
- return
+ return compacted, garbage
}
func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
@@ -131,20 +136,39 @@ func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_p
}
type ChunkView struct {
- FileId string
- Offset int64
- Size uint64
- LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
- ChunkSize uint64
- CipherKey []byte
- IsGzipped bool
+ FileId string
+ OffsetInChunk int64 // offset within the chunk
+ ViewSize uint64
+ ViewOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
+ ChunkSize uint64
+ CipherKey []byte
+ IsGzipped bool
+ ModifiedTsNs int64
+}
+
+func (cv *ChunkView) SetStartStop(start, stop int64) {
+ cv.OffsetInChunk += start - cv.ViewOffset
+ cv.ViewOffset = start
+ cv.ViewSize = uint64(stop - start)
+}
+func (cv *ChunkView) Clone() IntervalValue {
+ return &ChunkView{
+ FileId: cv.FileId,
+ OffsetInChunk: cv.OffsetInChunk,
+ ViewSize: cv.ViewSize,
+ ViewOffset: cv.ViewOffset,
+ ChunkSize: cv.ChunkSize,
+ CipherKey: cv.CipherKey,
+ IsGzipped: cv.IsGzipped,
+ ModifiedTsNs: cv.ModifiedTsNs,
+ }
}
func (cv *ChunkView) IsFullChunk() bool {
- return cv.Size == cv.ChunkSize
+ return cv.ViewSize == cv.ChunkSize
}
-func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
+func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size)
@@ -152,7 +176,7 @@ func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*
}
-func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) {
+func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
stop := offset + size
if size == math.MaxInt64 {
@@ -162,164 +186,112 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
stop = math.MaxInt64
}
- for _, chunk := range visibles {
+ chunkViews = NewIntervalList[*ChunkView]()
+ for x := visibles.Front(); x != nil; x = x.Next {
+ chunk := x.Value
chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop)
if chunkStart < chunkStop {
- views = append(views, &ChunkView{
- FileId: chunk.fileId,
- Offset: chunkStart - chunk.start + chunk.chunkOffset,
- Size: uint64(chunkStop - chunkStart),
- LogicOffset: chunkStart,
- ChunkSize: chunk.chunkSize,
- CipherKey: chunk.cipherKey,
- IsGzipped: chunk.isGzipped,
+ chunkView := &ChunkView{
+ FileId: chunk.fileId,
+ OffsetInChunk: chunkStart - chunk.start + chunk.offsetInChunk,
+ ViewSize: uint64(chunkStop - chunkStart),
+ ViewOffset: chunkStart,
+ ChunkSize: chunk.chunkSize,
+ CipherKey: chunk.cipherKey,
+ IsGzipped: chunk.isGzipped,
+ ModifiedTsNs: chunk.modifiedTsNs,
+ }
+ chunkViews.AppendInterval(&Interval[*ChunkView]{
+ StartOffset: chunkStart,
+ StopOffset: chunkStop,
+ TsNs: chunk.modifiedTsNs,
+ Value: chunkView,
+ Prev: nil,
+ Next: nil,
})
}
}
- return views
+ return chunkViews
}
-func logPrintf(name string, visibles []VisibleInterval) {
-
- /*
- glog.V(0).Infof("%s len %d", name, len(visibles))
- for _, v := range visibles {
- glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset)
- }
- */
-}
-
-func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) {
-
- newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.ModifiedTsNs, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed)
-
- length := len(visibles)
- if length == 0 {
- return append(visibles, newV)
- }
- last := visibles[length-1]
- if last.stop <= chunk.Offset {
- return append(visibles, newV)
+func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, chunk *filer_pb.FileChunk) {
+
+ newV := &VisibleInterval{
+ start: start,
+ stop: stop,
+ fileId: chunk.GetFileIdString(),
+ modifiedTsNs: chunk.ModifiedTsNs,
+ offsetInChunk: start - chunk.Offset, // the starting position in the chunk
+ chunkSize: chunk.Size, // size of the chunk
+ cipherKey: chunk.CipherKey,
+ isGzipped: chunk.IsCompressed,
}
- logPrintf(" before", visibles)
- // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size)
- chunkStop := chunk.Offset + int64(chunk.Size)
- for _, v := range visibles {
- if v.start < chunk.Offset && chunk.Offset < v.stop {
- t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTsNs, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)
- newVisibles = append(newVisibles, t)
- // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
- }
- if v.start < chunkStop && chunkStop < v.stop {
- t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTsNs, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)
- newVisibles = append(newVisibles, t)
- // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
- }
- if chunkStop <= v.start || v.stop <= chunk.Offset {
- newVisibles = append(newVisibles, v)
- // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop)
- }
- }
- newVisibles = append(newVisibles, newV)
-
- logPrintf(" append", newVisibles)
+ visibles.InsertInterval(start, stop, chunk.ModifiedTsNs, newV)
+}
- for i := len(newVisibles) - 1; i >= 0; i-- {
- if i > 0 && newV.start < newVisibles[i-1].start {
- newVisibles[i] = newVisibles[i-1]
- } else {
- newVisibles[i] = newV
- break
- }
+func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop int64, chunk *filer_pb.FileChunk) {
+
+ chunkView := &ChunkView{
+ FileId: chunk.GetFileIdString(),
+ OffsetInChunk: start - chunk.Offset,
+ ViewSize: uint64(stop - start),
+ ViewOffset: start,
+ ChunkSize: chunk.Size,
+ CipherKey: chunk.CipherKey,
+ IsGzipped: chunk.IsCompressed,
+ ModifiedTsNs: chunk.ModifiedTsNs,
}
- logPrintf(" sorted", newVisibles)
- return newVisibles
+ chunkViews.InsertInterval(start, stop, chunk.ModifiedTsNs, chunkView)
}
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) {
+func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
if err != nil {
return
}
- visibles2 := readResolvedChunks(chunks)
-
- if true {
- return visibles2, err
- }
- slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
- if a.ModifiedTsNs == b.ModifiedTsNs {
- filer_pb.EnsureFid(a)
- filer_pb.EnsureFid(b)
- if a.Fid == nil || b.Fid == nil {
- return true
- }
- return a.Fid.FileKey < b.Fid.FileKey
- }
- return a.ModifiedTsNs < b.ModifiedTsNs
- })
- for _, chunk := range chunks {
-
- // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
- visibles = MergeIntoVisibles(visibles, chunk)
-
- logPrintf("add", visibles)
+ visibles2 := readResolvedChunks(chunks, 0, math.MaxInt64)
- }
-
- if len(visibles) != len(visibles2) {
- fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2))
- } else {
- for i := 0; i < len(visibles); i++ {
- checkDifference(visibles[i], visibles2[i])
- }
- }
-
- return
-}
-
-func checkDifference(x, y VisibleInterval) {
- if x.start != y.start ||
- x.stop != y.stop ||
- x.fileId != y.fileId ||
- x.modifiedTsNs != y.modifiedTsNs {
- fmt.Printf("different visible %+v : %+v\n", x, y)
- }
+ return visibles2, err
}
// find non-overlapping visible intervals
// visible interval map to one file chunk
type VisibleInterval struct {
- start int64
- stop int64
- modifiedTsNs int64
- fileId string
- chunkOffset int64
- chunkSize uint64
- cipherKey []byte
- isGzipped bool
+ start int64
+ stop int64
+ modifiedTsNs int64
+ fileId string
+ offsetInChunk int64
+ chunkSize uint64
+ cipherKey []byte
+ isGzipped bool
}
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
- return VisibleInterval{
- start: start,
- stop: stop,
- fileId: fileId,
- modifiedTsNs: modifiedTime,
- chunkOffset: chunkOffset, // the starting position in the chunk
- chunkSize: chunkSize,
- cipherKey: cipherKey,
- isGzipped: isGzipped,
+func (v *VisibleInterval) SetStartStop(start, stop int64) {
+ v.offsetInChunk += start - v.start
+ v.start, v.stop = start, stop
+}
+func (v *VisibleInterval) Clone() IntervalValue {
+ return &VisibleInterval{
+ start: v.start,
+ stop: v.stop,
+ modifiedTsNs: v.modifiedTsNs,
+ fileId: v.fileId,
+ offsetInChunk: v.offsetInChunk,
+ chunkSize: v.chunkSize,
+ cipherKey: v.cipherKey,
+ isGzipped: v.isGzipped,
}
}
diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go
index 8a15f6e7a..8b2d36e12 100644
--- a/weed/filer/filechunks_read.go
+++ b/weed/filer/filechunks_read.go
@@ -1,14 +1,22 @@
package filer
import (
+ "container/list"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"golang.org/x/exp/slices"
)
-func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
+func readResolvedChunks(chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval]) {
var points []*Point
for _, chunk := range chunks {
+ if chunk.IsChunkManifest {
+ println("This should not happen! A manifest chunk found:", chunk.GetFileIdString())
+ }
+ start, stop := max(chunk.Offset, startOffset), min(chunk.Offset+int64(chunk.Size), stopOffset)
+ if start >= stop {
+ continue
+ }
points = append(points, &Point{
x: chunk.Offset,
ts: chunk.ModifiedTsNs,
@@ -33,40 +41,45 @@ func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterva
})
var prevX int64
- var queue []*Point
+ queue := list.New() // points with higher ts are at the tail
+ visibles = NewIntervalList[*VisibleInterval]()
+ var prevPoint *Point
for _, point := range points {
+ if queue.Len() > 0 {
+ prevPoint = queue.Back().Value.(*Point)
+ } else {
+ prevPoint = nil
+ }
if point.isStart {
- if len(queue) > 0 {
- lastIndex := len(queue) - 1
- lastPoint := queue[lastIndex]
- if point.x != prevX && lastPoint.ts < point.ts {
- visibles = addToVisibles(visibles, prevX, lastPoint, point)
+ if prevPoint != nil {
+ if point.x != prevX && prevPoint.ts < point.ts {
+ addToVisibles(visibles, prevX, prevPoint, point)
prevX = point.x
}
}
// insert into queue
- for i := len(queue); i >= 0; i-- {
- if i == 0 || queue[i-1].ts <= point.ts {
- if i == len(queue) {
- prevX = point.x
+ if prevPoint == nil || prevPoint.ts < point.ts {
+ queue.PushBack(point)
+ prevX = point.x
+ } else {
+ for e := queue.Front(); e != nil; e = e.Next() {
+ if e.Value.(*Point).ts > point.ts {
+ queue.InsertBefore(point, e)
+ break
}
- queue = addToQueue(queue, i, point)
- break
}
}
} else {
- lastIndex := len(queue) - 1
- index := lastIndex
- var startPoint *Point
- for ; index >= 0; index-- {
- startPoint = queue[index]
- if startPoint.ts == point.ts {
- queue = removeFromQueue(queue, index)
+ isLast := true
+ for e := queue.Back(); e != nil; e = e.Prev() {
+ if e.Value.(*Point).ts == point.ts {
+ queue.Remove(e)
break
}
+ isLast = false
}
- if index == lastIndex && startPoint != nil {
- visibles = addToVisibles(visibles, prevX, startPoint, point)
+ if isLast && prevPoint != nil {
+ addToVisibles(visibles, prevX, prevPoint, point)
prevX = point.x
}
}
@@ -75,37 +88,30 @@ func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterva
return
}
-func removeFromQueue(queue []*Point, index int) []*Point {
- for i := index; i < len(queue)-1; i++ {
- queue[i] = queue[i+1]
- }
- queue = queue[:len(queue)-1]
- return queue
-}
-
-func addToQueue(queue []*Point, index int, point *Point) []*Point {
- queue = append(queue, point)
- for i := len(queue) - 1; i > index; i-- {
- queue[i], queue[i-1] = queue[i-1], queue[i]
- }
- return queue
-}
-
-func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval {
+func addToVisibles(visibles *IntervalList[*VisibleInterval], prevX int64, startPoint *Point, point *Point) {
if prevX < point.x {
chunk := startPoint.chunk
- visibles = append(visibles, VisibleInterval{
- start: prevX,
- stop: point.x,
- fileId: chunk.GetFileIdString(),
- modifiedTsNs: chunk.ModifiedTsNs,
- chunkOffset: prevX - chunk.Offset,
- chunkSize: chunk.Size,
- cipherKey: chunk.CipherKey,
- isGzipped: chunk.IsCompressed,
- })
+ visible := &VisibleInterval{
+ start: prevX,
+ stop: point.x,
+ fileId: chunk.GetFileIdString(),
+ modifiedTsNs: chunk.ModifiedTsNs,
+ offsetInChunk: prevX - chunk.Offset,
+ chunkSize: chunk.Size,
+ cipherKey: chunk.CipherKey,
+ isGzipped: chunk.IsCompressed,
+ }
+ appendVisibleInterfal(visibles, visible)
}
- return visibles
+}
+
+func appendVisibleInterfal(visibles *IntervalList[*VisibleInterval], visible *VisibleInterval) {
+ visibles.AppendInterval(&Interval[*VisibleInterval]{
+ StartOffset: visible.start,
+ StopOffset: visible.stop,
+ TsNs: visible.modifiedTsNs,
+ Value: visible,
+ })
}
type Point struct {
diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go
index d4bfca72e..c66a874bc 100644
--- a/weed/filer/filechunks_read_test.go
+++ b/weed/filer/filechunks_read_test.go
@@ -3,6 +3,7 @@ package filer
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "math"
"math/rand"
"testing"
)
@@ -42,9 +43,38 @@ func TestReadResolvedChunks(t *testing.T) {
},
}
- visibles := readResolvedChunks(chunks)
+ visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
- for _, visible := range visibles {
+ fmt.Printf("resolved to %d visible intervales\n", visibles.Len())
+ for x := visibles.Front(); x != nil; x = x.Next {
+ visible := x.Value
+ fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
+ }
+
+}
+
+func TestReadResolvedChunks2(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {
+ FileId: "c",
+ Offset: 200,
+ Size: 50,
+ ModifiedTsNs: 3,
+ },
+ {
+ FileId: "e",
+ Offset: 200,
+ Size: 25,
+ ModifiedTsNs: 5,
+ },
+ }
+
+ visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
+
+ fmt.Printf("resolved to %d visible intervales\n", visibles.Len())
+ for x := visibles.Front(); x != nil; x = x.Next {
+ visible := x.Value
fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
}
@@ -72,9 +102,10 @@ func TestRandomizedReadResolvedChunks(t *testing.T) {
chunks = append(chunks, randomWrite(array, start, size, ts))
}
- visibles := readResolvedChunks(chunks)
+ visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
- for _, visible := range visibles {
+ for x := visibles.Front(); x != nil; x = x.Next {
+ visible := x.Value
for i := visible.start; i < visible.stop; i++ {
if array[i] != visible.modifiedTsNs {
t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTsNs)
@@ -112,9 +143,9 @@ func TestSequentialReadResolvedChunks(t *testing.T) {
})
}
- visibles := readResolvedChunks(chunks)
+ visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
- fmt.Printf("visibles %d", len(visibles))
+ fmt.Printf("visibles %d", visibles.Len())
}
@@ -201,9 +232,48 @@ func TestActualReadResolvedChunks(t *testing.T) {
},
}
- visibles := readResolvedChunks(chunks)
+ visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
+
+ for x := visibles.Front(); x != nil; x = x.Next {
+ visible := x.Value
+ fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
+ }
+
+}
+
+func TestActualReadResolvedChunks2(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {
+ FileId: "1,e7b96fef48",
+ Offset: 0,
+ Size: 184320,
+ ModifiedTsNs: 1,
+ },
+ {
+ FileId: "2,22562640b9",
+ Offset: 184320,
+ Size: 4096,
+ ModifiedTsNs: 2,
+ },
+ {
+ FileId: "2,33562640b9",
+ Offset: 184320,
+ Size: 4096,
+ ModifiedTsNs: 4,
+ },
+ {
+ FileId: "4,df033e0fe4",
+ Offset: 188416,
+ Size: 2097152,
+ ModifiedTsNs: 3,
+ },
+ }
+
+ visibles := readResolvedChunks(chunks, 0, math.MaxInt64)
- for _, visible := range visibles {
+ for x := visibles.Front(); x != nil; x = x.Next {
+ visible := x.Value
fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs)
}
diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go
index d29e0a600..b448950a9 100644
--- a/weed/filer/filechunks_test.go
+++ b/weed/filer/filechunks_test.go
@@ -92,7 +92,8 @@ func TestRandomFileChunksCompact(t *testing.T) {
visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64)
- for _, v := range visibles {
+ for visible := visibles.Front(); visible != nil; visible = visible.Next {
+ v := visible.Value
for x := v.start; x < v.stop; x++ {
assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId)
}
@@ -137,7 +138,7 @@ func TestIntervalMerging(t *testing.T) {
},
Expected: []*VisibleInterval{
{start: 0, stop: 70, fileId: "b"},
- {start: 70, stop: 100, fileId: "a", chunkOffset: 70},
+ {start: 70, stop: 100, fileId: "a", offsetInChunk: 70},
},
},
// case 3: updates overwrite full chunks
@@ -174,15 +175,15 @@ func TestIntervalMerging(t *testing.T) {
},
Expected: []*VisibleInterval{
{start: 0, stop: 200, fileId: "d"},
- {start: 200, stop: 220, fileId: "c", chunkOffset: 130},
+ {start: 200, stop: 220, fileId: "c", offsetInChunk: 130},
},
},
// case 6: same updates
{
Chunks: []*filer_pb.FileChunk{
{Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, ModifiedTsNs: 123},
- {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 123},
- {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 123},
+ {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 124},
+ {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 125},
},
Expected: []*VisibleInterval{
{start: 0, stop: 100, fileId: "xyz"},
@@ -228,11 +229,17 @@ func TestIntervalMerging(t *testing.T) {
for i, testcase := range testcases {
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64)
- for x, interval := range intervals {
- log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
- i, x, interval.start, interval.stop, interval.fileId)
+ x := -1
+ for visible := intervals.Front(); visible != nil; visible = visible.Next {
+ x++
+ interval := visible.Value
+ log.Printf("test case %d, interval start=%d, stop=%d, fileId=%s",
+ i, interval.start, interval.stop, interval.fileId)
}
- for x, interval := range intervals {
+ x = -1
+ for visible := intervals.Front(); visible != nil; visible = visible.Next {
+ x++
+ interval := visible.Value
if interval.start != testcase.Expected[x].start {
t.Fatalf("failed on test case %d, interval %d, start %d, expect %d",
i, x, interval.start, testcase.Expected[x].start)
@@ -245,13 +252,13 @@ func TestIntervalMerging(t *testing.T) {
t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s",
i, x, interval.fileId, testcase.Expected[x].fileId)
}
- if interval.chunkOffset != testcase.Expected[x].chunkOffset {
- t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d",
- i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset)
+ if interval.offsetInChunk != testcase.Expected[x].offsetInChunk {
+ t.Fatalf("failed on test case %d, interval %d, offsetInChunk %d, expect %d",
+ i, x, interval.offsetInChunk, testcase.Expected[x].offsetInChunk)
}
}
- if len(intervals) != len(testcase.Expected) {
- t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
+ if intervals.Len() != len(testcase.Expected) {
+ t.Fatalf("failed to compact test case %d, len %d expected %d", i, intervals.Len(), len(testcase.Expected))
}
}
@@ -276,9 +283,9 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 250,
Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
- {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200},
+ {OffsetInChunk: 0, ViewSize: 100, FileId: "abc", ViewOffset: 0},
+ {OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 100},
+ {OffsetInChunk: 0, ViewSize: 50, FileId: "fsad", ViewOffset: 200},
},
},
// case 1: updates overwrite full chunks
@@ -290,7 +297,7 @@ func TestChunksReading(t *testing.T) {
Offset: 50,
Size: 100,
Expected: []*ChunkView{
- {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50},
+ {OffsetInChunk: 50, ViewSize: 100, FileId: "asdf", ViewOffset: 50},
},
},
// case 2: updates overwrite part of previous chunks
@@ -302,8 +309,8 @@ func TestChunksReading(t *testing.T) {
Offset: 30,
Size: 40,
Expected: []*ChunkView{
- {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30},
- {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60},
+ {OffsetInChunk: 20, ViewSize: 30, FileId: "b", ViewOffset: 30},
+ {OffsetInChunk: 57, ViewSize: 10, FileId: "a", ViewOffset: 60},
},
},
// case 3: updates overwrite full chunks
@@ -316,8 +323,8 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 200,
Expected: []*ChunkView{
- {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0},
- {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50},
+ {OffsetInChunk: 0, ViewSize: 50, FileId: "asdf", ViewOffset: 0},
+ {OffsetInChunk: 0, ViewSize: 150, FileId: "xxxx", ViewOffset: 50},
},
},
// case 4: updates far away from prev chunks
@@ -330,8 +337,8 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 400,
Expected: []*ChunkView{
- {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
- {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250},
+ {OffsetInChunk: 0, ViewSize: 200, FileId: "asdf", ViewOffset: 0},
+ {OffsetInChunk: 0, ViewSize: 150, FileId: "xxxx", ViewOffset: 250},
},
},
// case 5: updates overwrite full chunks
@@ -345,21 +352,21 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 220,
Expected: []*ChunkView{
- {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0},
- {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200},
+ {OffsetInChunk: 0, ViewSize: 200, FileId: "c", ViewOffset: 0},
+ {OffsetInChunk: 130, ViewSize: 20, FileId: "b", ViewOffset: 200},
},
},
// case 6: same updates
{
Chunks: []*filer_pb.FileChunk{
{Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, ModifiedTsNs: 123},
- {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 123},
- {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 123},
+ {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 124},
+ {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 125},
},
Offset: 0,
Size: 100,
Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0},
+ {OffsetInChunk: 0, ViewSize: 100, FileId: "xyz", ViewOffset: 0},
},
},
// case 7: edge cases
@@ -372,8 +379,8 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 200,
Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
+ {OffsetInChunk: 0, ViewSize: 100, FileId: "abc", ViewOffset: 0},
+ {OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 100},
},
},
// case 8: edge cases
@@ -386,9 +393,9 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 300,
Expected: []*ChunkView{
- {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90},
- {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190},
+ {OffsetInChunk: 0, ViewSize: 90, FileId: "abc", ViewOffset: 0},
+ {OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 90},
+ {OffsetInChunk: 0, ViewSize: 110, FileId: "fsad", ViewOffset: 190},
},
},
// case 9: edge cases
@@ -404,12 +411,12 @@ func TestChunksReading(t *testing.T) {
Offset: 0,
Size: 153578836,
Expected: []*ChunkView{
- {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0},
- {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936},
- {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760},
- {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736},
- {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168},
- {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248},
+ {OffsetInChunk: 0, ViewSize: 43175936, FileId: "2,111fc2cbfac1", ViewOffset: 0},
+ {OffsetInChunk: 0, ViewSize: 52981760 - 43175936, FileId: "2,112a36ea7f85", ViewOffset: 43175936},
+ {OffsetInChunk: 0, ViewSize: 72564736 - 52981760, FileId: "4,112d5f31c5e7", ViewOffset: 52981760},
+ {OffsetInChunk: 0, ViewSize: 133255168 - 72564736, FileId: "1,113245f0cdb6", ViewOffset: 72564736},
+ {OffsetInChunk: 0, ViewSize: 137269248 - 133255168, FileId: "3,1141a70733b5", ViewOffset: 133255168},
+ {OffsetInChunk: 0, ViewSize: 153578836 - 137269248, FileId: "1,114201d5bbdb", ViewOffset: 137269248},
},
},
}
@@ -420,28 +427,31 @@ func TestChunksReading(t *testing.T) {
}
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
- for x, chunk := range chunks {
+ x := -1
+ for c := chunks.Front(); c != nil; c = c.Next {
+ x++
+ chunk := c.Value
log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
- i, x, chunk.Offset, chunk.Size, chunk.FileId)
- if chunk.Offset != testcase.Expected[x].Offset {
+ i, x, chunk.OffsetInChunk, chunk.ViewSize, chunk.FileId)
+ if chunk.OffsetInChunk != testcase.Expected[x].OffsetInChunk {
t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d",
- i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset)
+ i, chunk.FileId, chunk.OffsetInChunk, testcase.Expected[x].OffsetInChunk)
}
- if chunk.Size != testcase.Expected[x].Size {
- t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d",
- i, chunk.FileId, chunk.Size, testcase.Expected[x].Size)
+ if chunk.ViewSize != testcase.Expected[x].ViewSize {
+ t.Fatalf("failed on read case %d, chunk %s, ViewSize %d, expect %d",
+ i, chunk.FileId, chunk.ViewSize, testcase.Expected[x].ViewSize)
}
if chunk.FileId != testcase.Expected[x].FileId {
t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s",
i, x, chunk.FileId, testcase.Expected[x].FileId)
}
- if chunk.LogicOffset != testcase.Expected[x].LogicOffset {
- t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d",
- i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset)
+ if chunk.ViewOffset != testcase.Expected[x].ViewOffset {
+ t.Fatalf("failed on read case %d, chunk %d, ViewOffset %d, expect %d",
+ i, x, chunk.ViewOffset, testcase.Expected[x].ViewOffset)
}
}
- if len(chunks) != len(testcase.Expected) {
- t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected))
+ if chunks.Len() != len(testcase.Expected) {
+ t.Fatalf("failed to read test case %d, len %d expected %d", i, chunks.Len(), len(testcase.Expected))
}
}
@@ -467,73 +477,79 @@ func BenchmarkCompactFileChunks(b *testing.B) {
}
}
+func addVisibleInterval(visibles *IntervalList[*VisibleInterval], x *VisibleInterval) {
+ visibles.AppendInterval(&Interval[*VisibleInterval]{
+ StartOffset: x.start,
+ StopOffset: x.stop,
+ TsNs: x.modifiedTsNs,
+ Value: x,
+ })
+}
+
func TestViewFromVisibleIntervals(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 0,
- stop: 25,
- fileId: "fid1",
- },
- {
- start: 4096,
- stop: 8192,
- fileId: "fid2",
- },
- {
- start: 16384,
- stop: 18551,
- fileId: "fid3",
- },
- }
+ visibles := NewIntervalList[*VisibleInterval]()
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 0,
+ stop: 25,
+ fileId: "fid1",
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 4096,
+ stop: 8192,
+ fileId: "fid2",
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 16384,
+ stop: 18551,
+ fileId: "fid3",
+ })
views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
- if len(views) != len(visibles) {
- assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ if views.Len() != visibles.Len() {
+ assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error")
}
}
func TestViewFromVisibleIntervals2(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 344064,
- stop: 348160,
- fileId: "fid1",
- },
- {
- start: 348160,
- stop: 356352,
- fileId: "fid2",
- },
- }
+ visibles := NewIntervalList[*VisibleInterval]()
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 344064,
+ stop: 348160,
+ fileId: "fid1",
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 348160,
+ stop: 356352,
+ fileId: "fid2",
+ })
views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
- if len(views) != len(visibles) {
- assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ if views.Len() != visibles.Len() {
+ assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error")
}
}
func TestViewFromVisibleIntervals3(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 1000,
- stop: 2000,
- fileId: "fid1",
- },
- {
- start: 3000,
- stop: 4000,
- fileId: "fid2",
- },
- }
+ visibles := NewIntervalList[*VisibleInterval]()
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 1000,
+ stop: 2000,
+ fileId: "fid1",
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 3000,
+ stop: 4000,
+ fileId: "fid2",
+ })
views := ViewFromVisibleIntervals(visibles, 1700, 1500)
- if len(views) != len(visibles) {
- assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ if views.Len() != visibles.Len() {
+ assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error")
}
}
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index 5c03d4f16..55278c492 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -40,7 +40,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
}
// append to existing chunks
- entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset))
+ entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano()))
// update the entry
err = f.CreateEntry(context.Background(), entry, false, false, nil, false)
diff --git a/weed/filer/interval_list.go b/weed/filer/interval_list.go
new file mode 100644
index 000000000..b3d2a76b9
--- /dev/null
+++ b/weed/filer/interval_list.go
@@ -0,0 +1,259 @@
+package filer
+
+import (
+ "math"
+ "sync"
+)
+
+type IntervalValue interface {
+ SetStartStop(start, stop int64)
+ Clone() IntervalValue
+}
+
+type Interval[T IntervalValue] struct {
+ StartOffset int64
+ StopOffset int64
+ TsNs int64
+ Value T
+ Prev *Interval[T]
+ Next *Interval[T]
+}
+
+func (interval *Interval[T]) Size() int64 {
+ return interval.StopOffset - interval.StartOffset
+}
+
+// IntervalList mark written intervals within one page chunk
+type IntervalList[T IntervalValue] struct {
+ head *Interval[T]
+ tail *Interval[T]
+ Lock sync.Mutex
+}
+
+func NewIntervalList[T IntervalValue]() *IntervalList[T] {
+ list := &IntervalList[T]{
+ head: &Interval[T]{
+ StartOffset: -1,
+ StopOffset: -1,
+ },
+ tail: &Interval[T]{
+ StartOffset: math.MaxInt64,
+ StopOffset: math.MaxInt64,
+ },
+ }
+ return list
+}
+
+func (list *IntervalList[T]) Front() (interval *Interval[T]) {
+ return list.head.Next
+}
+
+func (list *IntervalList[T]) AppendInterval(interval *Interval[T]) {
+ list.Lock.Lock()
+ defer list.Lock.Unlock()
+
+ if list.head.Next == nil {
+ list.head.Next = interval
+ }
+ interval.Prev = list.tail.Prev
+ if list.tail.Prev != nil {
+ list.tail.Prev.Next = interval
+ }
+ list.tail.Prev = interval
+}
+
+func (list *IntervalList[T]) Overlay(startOffset, stopOffset, tsNs int64, value T) {
+ if startOffset >= stopOffset {
+ return
+ }
+ interval := &Interval[T]{
+ StartOffset: startOffset,
+ StopOffset: stopOffset,
+ TsNs: tsNs,
+ Value: value,
+ }
+
+ list.Lock.Lock()
+ defer list.Lock.Unlock()
+
+ list.overlayInterval(interval)
+}
+
+func (list *IntervalList[T]) InsertInterval(startOffset, stopOffset, tsNs int64, value T) {
+ interval := &Interval[T]{
+ StartOffset: startOffset,
+ StopOffset: stopOffset,
+ TsNs: tsNs,
+ Value: value,
+ }
+
+ list.Lock.Lock()
+ defer list.Lock.Unlock()
+
+ value.SetStartStop(startOffset, stopOffset)
+ list.insertInterval(interval)
+}
+
+func (list *IntervalList[T]) insertInterval(interval *Interval[T]) {
+ prev := list.head
+ next := prev.Next
+
+ for interval.StartOffset < interval.StopOffset {
+ if next == nil {
+ // add to the end
+ list.insertBetween(prev, interval, list.tail)
+ break
+ }
+
+ // interval is ahead of the next
+ if interval.StopOffset <= next.StartOffset {
+ list.insertBetween(prev, interval, next)
+ break
+ }
+
+ // interval is after the next
+ if next.StopOffset <= interval.StartOffset {
+ prev = next
+ next = next.Next
+ continue
+ }
+
+ // intersecting next and interval
+ if interval.TsNs >= next.TsNs {
+ // interval is newer
+ if next.StartOffset < interval.StartOffset {
+ // left side of next is ahead of interval
+ t := &Interval[T]{
+ StartOffset: next.StartOffset,
+ StopOffset: interval.StartOffset,
+ TsNs: next.TsNs,
+ Value: next.Value.Clone().(T),
+ }
+ t.Value.SetStartStop(t.StartOffset, t.StopOffset)
+ list.insertBetween(prev, t, interval)
+ next.StartOffset = interval.StartOffset
+ next.Value.SetStartStop(next.StartOffset, next.StopOffset)
+ prev = t
+ }
+ if interval.StopOffset < next.StopOffset {
+ // right side of next is after interval
+ next.StartOffset = interval.StopOffset
+ next.Value.SetStartStop(next.StartOffset, next.StopOffset)
+ list.insertBetween(prev, interval, next)
+ break
+ } else {
+ // next is covered
+ prev.Next = interval
+ next = next.Next
+ }
+ } else {
+ // next is newer
+ if interval.StartOffset < next.StartOffset {
+ // left side of interval is ahead of next
+ t := &Interval[T]{
+ StartOffset: interval.StartOffset,
+ StopOffset: next.StartOffset,
+ TsNs: interval.TsNs,
+ Value: interval.Value.Clone().(T),
+ }
+ t.Value.SetStartStop(t.StartOffset, t.StopOffset)
+ list.insertBetween(prev, t, next)
+ interval.StartOffset = next.StartOffset
+ interval.Value.SetStartStop(interval.StartOffset, interval.StopOffset)
+ }
+ if next.StopOffset < interval.StopOffset {
+ // right side of interval is after next
+ interval.StartOffset = next.StopOffset
+ interval.Value.SetStartStop(interval.StartOffset, interval.StopOffset)
+ } else {
+ // interval is covered
+ break
+ }
+ }
+
+ }
+}
+
+func (list *IntervalList[T]) insertBetween(a, interval, b *Interval[T]) {
+ a.Next = interval
+ b.Prev = interval
+ if a != list.head {
+ interval.Prev = a
+ }
+ if b != list.tail {
+ interval.Next = b
+ }
+}
+
+func (list *IntervalList[T]) overlayInterval(interval *Interval[T]) {
+
+ //t := list.head
+ //for ; t.Next != nil; t = t.Next {
+ // if t.TsNs > interval.TsNs {
+ // println("writes is out of order", t.TsNs-interval.TsNs, "ns")
+ // }
+ //}
+
+ p := list.head
+ for ; p.Next != nil && p.Next.StopOffset <= interval.StartOffset; p = p.Next {
+ }
+ q := list.tail
+ for ; q.Prev != nil && q.Prev.StartOffset >= interval.StopOffset; q = q.Prev {
+ }
+
+ // left side
+ // interval after p.Next start
+ if p.Next != nil && p.Next.StartOffset < interval.StartOffset {
+ t := &Interval[T]{
+ StartOffset: p.Next.StartOffset,
+ StopOffset: interval.StartOffset,
+ TsNs: p.Next.TsNs,
+ Value: p.Next.Value,
+ }
+ p.Next = t
+ if p != list.head {
+ t.Prev = p
+ }
+ t.Next = interval
+ interval.Prev = t
+ } else {
+ p.Next = interval
+ if p != list.head {
+ interval.Prev = p
+ }
+ }
+
+ // right side
+ // interval ends before p.Prev
+ if q.Prev != nil && interval.StopOffset < q.Prev.StopOffset {
+ t := &Interval[T]{
+ StartOffset: interval.StopOffset,
+ StopOffset: q.Prev.StopOffset,
+ TsNs: q.Prev.TsNs,
+ Value: q.Prev.Value,
+ }
+ q.Prev = t
+ if q != list.tail {
+ t.Next = q
+ }
+ interval.Next = t
+ t.Prev = interval
+ } else {
+ q.Prev = interval
+ if q != list.tail {
+ interval.Next = q
+ }
+ }
+
+}
+
+func (list *IntervalList[T]) Len() int {
+ list.Lock.Lock()
+ defer list.Lock.Unlock()
+
+ var count int
+ for t := list.head; t != nil; t = t.Next {
+ count++
+ }
+ return count - 1
+}
diff --git a/weed/filer/interval_list_test.go b/weed/filer/interval_list_test.go
new file mode 100644
index 000000000..dea510fed
--- /dev/null
+++ b/weed/filer/interval_list_test.go
@@ -0,0 +1,327 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type IntervalInt int
+
+func (i IntervalInt) SetStartStop(start, stop int64) {
+}
+func (i IntervalInt) Clone() IntervalValue {
+ return i
+}
+
+func TestIntervalList_Overlay(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(0, 100, 1, 1)
+ list.Overlay(50, 150, 2, 2)
+ list.Overlay(200, 250, 3, 3)
+ list.Overlay(225, 250, 4, 4)
+ list.Overlay(175, 210, 5, 5)
+ list.Overlay(0, 25, 6, 6)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 6, list.Len())
+ println()
+ list.Overlay(50, 150, 7, 7)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 6, list.Len())
+}
+
+func TestIntervalList_Overlay2(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(0, 50, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+}
+
+func TestIntervalList_Overlay3(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ assert.Equal(t, 1, list.Len())
+
+ list.Overlay(0, 60, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_Overlay4(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(0, 100, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 1, list.Len())
+}
+
+func TestIntervalList_Overlay5(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(0, 110, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 1, list.Len())
+}
+
+func TestIntervalList_Overlay6(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(50, 110, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 1, list.Len())
+}
+
+func TestIntervalList_Overlay7(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(50, 90, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_Overlay8(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(60, 90, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 3, list.Len())
+}
+
+func TestIntervalList_Overlay9(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(60, 100, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_Overlay10(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(50, 100, 1, 1)
+ list.Overlay(60, 110, 2, 2)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_Overlay11(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.Overlay(0, 100, 1, 1)
+ list.Overlay(100, 110, 2, 2)
+ list.Overlay(0, 90, 3, 3)
+ list.Overlay(0, 80, 4, 4)
+ list.Overlay(0, 90, 5, 5)
+ list.Overlay(90, 90, 6, 6)
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 3, list.Len())
+}
+
+func TestIntervalList_insertInterval1(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.InsertInterval(50, 150, 2, 2)
+ list.InsertInterval(200, 250, 3, 3)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_insertInterval2(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.InsertInterval(50, 150, 2, 2)
+ list.InsertInterval(0, 25, 3, 3)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_insertInterval3(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.InsertInterval(50, 150, 2, 2)
+ list.InsertInterval(200, 250, 4, 4)
+
+ list.InsertInterval(0, 75, 3, 3)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 3, list.Len())
+}
+
+func TestIntervalList_insertInterval4(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.InsertInterval(200, 250, 4, 4)
+
+ list.InsertInterval(0, 225, 3, 3)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_insertInterval5(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+ list.InsertInterval(200, 250, 4, 4)
+
+ list.InsertInterval(0, 225, 5, 5)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_insertInterval6(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+
+ list.InsertInterval(50, 150, 2, 2)
+ list.InsertInterval(200, 250, 4, 4)
+
+ list.InsertInterval(0, 275, 1, 1)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 5, list.Len())
+}
+
+func TestIntervalList_insertInterval7(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+
+ list.InsertInterval(50, 150, 2, 2)
+ list.InsertInterval(200, 250, 4, 4)
+
+ list.InsertInterval(75, 275, 1, 1)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 4, list.Len())
+}
+
+func TestIntervalList_insertInterval8(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+
+ list.InsertInterval(50, 150, 2, 2)
+ list.InsertInterval(200, 250, 4, 4)
+
+ list.InsertInterval(75, 275, 3, 3)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 4, list.Len())
+}
+
+func TestIntervalList_insertInterval9(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+
+ list.InsertInterval(50, 150, 2, 2)
+ list.InsertInterval(200, 250, 4, 4)
+
+ list.InsertInterval(50, 150, 3, 3)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 2, list.Len())
+}
+
+func TestIntervalList_insertInterval10(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+
+ list.InsertInterval(50, 100, 2, 2)
+
+ list.InsertInterval(200, 300, 4, 4)
+
+ list.InsertInterval(100, 200, 5, 5)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 3, list.Len())
+}
+
+func TestIntervalList_insertInterval11(t *testing.T) {
+ list := NewIntervalList[IntervalInt]()
+
+ list.InsertInterval(0, 64, 1, 1)
+
+ list.InsertInterval(72, 136, 3, 3)
+
+ list.InsertInterval(64, 128, 2, 2)
+
+ list.InsertInterval(68, 72, 4, 4)
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 4, list.Len())
+}
+
+type IntervalStruct struct {
+ x int
+ start int64
+ stop int64
+}
+
+func newIntervalStruct(i int) IntervalStruct {
+ return IntervalStruct{
+ x: i,
+ }
+}
+
+func (i IntervalStruct) SetStartStop(start, stop int64) {
+ i.start, i.stop = start, stop
+}
+func (i IntervalStruct) Clone() IntervalValue {
+ return &IntervalStruct{
+ x: i.x,
+ start: i.start,
+ stop: i.stop,
+ }
+}
+
+func TestIntervalList_insertIntervalStruct(t *testing.T) {
+ list := NewIntervalList[IntervalStruct]()
+
+ list.InsertInterval(0, 64, 1, newIntervalStruct(1))
+
+ list.InsertInterval(64, 72, 2, newIntervalStruct(2))
+
+ list.InsertInterval(72, 136, 3, newIntervalStruct(3))
+
+ list.InsertInterval(64, 68, 4, newIntervalStruct(4))
+
+ for p := list.Front(); p != nil; p = p.Next {
+ fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value)
+ }
+ assert.Equal(t, 4, list.Len())
+}
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 9d1fab20a..27e8f79a6 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -16,8 +16,7 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
- chunkViews []*ChunkView
- readerLock sync.Mutex
+ chunkViews *IntervalList[*ChunkView]
fileSize int64
readerCache *ReaderCache
readerPattern *ReaderPattern
@@ -89,7 +88,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
-func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
+func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews *IntervalList[*ChunkView], chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
@@ -108,44 +107,58 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerPattern.MonitorReadAt(offset, len(p))
- c.readerLock.Lock()
- defer c.readerLock.Unlock()
+ c.chunkViews.Lock.Lock()
+ defer c.chunkViews.Lock.Unlock()
+
+ // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
+ n, _, err = c.doReadAt(p, offset)
+ return
+}
+
+func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) {
+
+ c.readerPattern.MonitorReadAt(offset, len(p))
+
+ c.chunkViews.Lock.Lock()
+ defer c.chunkViews.Lock.Unlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
-func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
+func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) {
startOffset, remaining := offset, int64(len(p))
- var nextChunks []*ChunkView
- for i, chunk := range c.chunkViews {
+ var nextChunks *Interval[*ChunkView]
+ for x := c.chunkViews.Front(); x != nil; x = x.Next {
+ chunk := x.Value
if remaining <= 0 {
break
}
- if i+1 < len(c.chunkViews) {
- nextChunks = c.chunkViews[i+1:]
+ if x.Next != nil {
+ nextChunks = x.Next
}
- if startOffset < chunk.LogicOffset {
- gap := chunk.LogicOffset - startOffset
- glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset)
+ if startOffset < chunk.ViewOffset {
+ gap := chunk.ViewOffset - startOffset
+ glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset)
n += zero(p, startOffset-offset, gap)
- startOffset, remaining = chunk.LogicOffset, remaining-gap
+ startOffset, remaining = chunk.ViewOffset, remaining-gap
if remaining <= 0 {
break
}
}
- // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
- chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
+ // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
+ chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining)
if chunkStart >= chunkStop {
continue
}
- // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
- bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
+ // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
+ bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk
+ ts = chunk.ModifiedTsNs
copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return copied, err
+ return copied, ts, err
}
n += copied
@@ -177,7 +190,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
-func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) {
+func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
if c.readerPattern.IsRandomMode() {
n, err := c.readerCache.chunkCache.ReadChunkAt(buffer, chunkView.FileId, offset)
@@ -187,16 +200,14 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next
return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
}
- n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
+ n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.ViewOffset == 0)
if c.lastChunkFid != chunkView.FileId {
- if chunkView.Offset == 0 { // start of a new chunk
+ if chunkView.OffsetInChunk == 0 { // start of a new chunk
if c.lastChunkFid != "" {
c.readerCache.UnCache(c.lastChunkFid)
- c.readerCache.MaybeCache(nextChunkViews)
- } else {
- if len(nextChunkViews) >= 1 {
- c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning
- }
+ }
+ if nextChunkViews != nil {
+ c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning
}
}
}
diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go
index 29bd47ea4..f61d68a6d 100644
--- a/weed/filer/reader_at_test.go
+++ b/weed/filer/reader_at_test.go
@@ -5,7 +5,6 @@ import (
"io"
"math"
"strconv"
- "sync"
"testing"
)
@@ -34,42 +33,40 @@ func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
func TestReaderAt(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 1,
- stop: 2,
- fileId: "1",
- chunkSize: 9,
- },
- {
- start: 3,
- stop: 4,
- fileId: "3",
- chunkSize: 1,
- },
- {
- start: 5,
- stop: 6,
- fileId: "5",
- chunkSize: 2,
- },
- {
- start: 7,
- stop: 9,
- fileId: "7",
- chunkSize: 2,
- },
- {
- start: 9,
- stop: 10,
- fileId: "9",
- chunkSize: 2,
- },
- }
+ visibles := NewIntervalList[*VisibleInterval]()
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 1,
+ stop: 2,
+ fileId: "1",
+ chunkSize: 9,
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 3,
+ stop: 4,
+ fileId: "3",
+ chunkSize: 1,
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 5,
+ stop: 6,
+ fileId: "5",
+ chunkSize: 2,
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 7,
+ stop: 9,
+ fileId: "7",
+ chunkSize: 2,
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 9,
+ stop: 10,
+ fileId: "9",
+ chunkSize: 2,
+ })
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- readerLock: sync.Mutex{},
fileSize: 10,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@@ -86,7 +83,7 @@ func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, exp
if data == nil {
data = make([]byte, size)
}
- n, err := readerAt.doReadAt(data, offset)
+ n, _, err := readerAt.doReadAt(data, offset)
if expectedN != n {
t.Errorf("unexpected read size: %d, expect: %d", n, expectedN)
@@ -101,24 +98,22 @@ func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, exp
func TestReaderAt0(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 2,
- stop: 5,
- fileId: "1",
- chunkSize: 9,
- },
- {
- start: 7,
- stop: 9,
- fileId: "2",
- chunkSize: 9,
- },
- }
+ visibles := NewIntervalList[*VisibleInterval]()
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 2,
+ stop: 5,
+ fileId: "1",
+ chunkSize: 9,
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 7,
+ stop: 9,
+ fileId: "2",
+ chunkSize: 9,
+ })
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- readerLock: sync.Mutex{},
fileSize: 10,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@@ -135,18 +130,16 @@ func TestReaderAt0(t *testing.T) {
func TestReaderAt1(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 2,
- stop: 5,
- fileId: "1",
- chunkSize: 9,
- },
- }
+ visibles := NewIntervalList[*VisibleInterval]()
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 2,
+ stop: 5,
+ fileId: "1",
+ chunkSize: 9,
+ })
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- readerLock: sync.Mutex{},
fileSize: 20,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@@ -164,24 +157,22 @@ func TestReaderAt1(t *testing.T) {
}
func TestReaderAtGappedChunksDoNotLeak(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 2,
- stop: 3,
- fileId: "1",
- chunkSize: 5,
- },
- {
- start: 7,
- stop: 9,
- fileId: "1",
- chunkSize: 4,
- },
- }
+ visibles := NewIntervalList[*VisibleInterval]()
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 2,
+ stop: 3,
+ fileId: "1",
+ chunkSize: 5,
+ })
+ addVisibleInterval(visibles, &VisibleInterval{
+ start: 7,
+ stop: 9,
+ fileId: "1",
+ chunkSize: 4,
+ })
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- readerLock: sync.Mutex{},
fileSize: 9,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
@@ -193,8 +184,7 @@ func TestReaderAtGappedChunksDoNotLeak(t *testing.T) {
func TestReaderAtSparseFileDoesNotLeak(t *testing.T) {
readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals([]VisibleInterval{}, 0, math.MaxInt64),
- readerLock: sync.Mutex{},
+ chunkViews: ViewFromVisibleIntervals(NewIntervalList[*VisibleInterval](), 0, math.MaxInt64),
fileSize: 3,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index cb89c03c5..0a7c83de7 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -43,7 +43,7 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
-func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
+func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
if rc.lookupFileIdFn == nil {
return
}
@@ -55,7 +55,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
return
}
- for _, chunkView := range chunkViews {
+ for x := chunkViews; x != nil; x = x.Next {
+ chunkView := x.Value
if _, found := rc.downloaders[chunkView.FileId]; found {
continue
}
@@ -65,7 +66,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
return
}
- // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
+ // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
// cache this chunk if not yet
cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
go cacher.startCaching()
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index f28341be4..d49784686 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -6,7 +6,6 @@ import (
"golang.org/x/exp/slices"
"io"
"math"
- "sort"
"strings"
"sync"
"time"
@@ -78,7 +77,8 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w
fileId2Url := make(map[string][]string)
- for _, chunkView := range chunkViews {
+ for x := chunkViews.Front(); x != nil; x = x.Next {
+ chunkView := x.Value
var urlStrings []string
var err error
for _, backoff := range getLookupFileIdBackoffSchedule {
@@ -102,29 +102,30 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
- for _, chunkView := range chunkViews {
- if offset < chunkView.LogicOffset {
- gap := chunkView.LogicOffset - offset
+ for x := chunkViews.Front(); x != nil; x = x.Next {
+ chunkView := x.Value
+ if offset < chunkView.ViewOffset {
+ gap := chunkView.ViewOffset - offset
remaining -= gap
- glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
+ glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset)
err := writeZero(writer, gap)
if err != nil {
- return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
+ return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset)
}
- offset = chunkView.LogicOffset
+ offset = chunkView.ViewOffset
}
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
- err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
- offset += int64(chunkView.Size)
- remaining -= int64(chunkView.Size)
+ err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
+ offset += int64(chunkView.ViewSize)
+ remaining -= int64(chunkView.ViewSize)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil {
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
return fmt.Errorf("read chunk: %v", err)
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
- downloadThrottler.MaybeSlowdown(int64(chunkView.Size))
+ downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize))
}
if remaining > 0 {
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
@@ -167,14 +168,15 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
idx := 0
- for _, chunkView := range chunkViews {
+ for x := chunkViews.Front(); x != nil; x = x.Next {
+ chunkView := x.Value
urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
- n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
+ n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}
@@ -185,7 +187,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
// ---------------- ChunkStreamReader ----------------------------------
type ChunkStreamReader struct {
- chunkViews []*ChunkView
+ chunkView *Interval[*ChunkView]
totalSize int64
logicOffset int64
buffer []byte
@@ -201,17 +203,15 @@ var _ = io.ReaderAt(&ChunkStreamReader{})
func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
- slices.SortFunc(chunkViews, func(a, b *ChunkView) bool {
- return a.LogicOffset < b.LogicOffset
- })
var totalSize int64
- for _, chunk := range chunkViews {
- totalSize += int64(chunk.Size)
+ for x := chunkViews.Front(); x != nil; x = x.Next {
+ chunk := x.Value
+ totalSize += int64(chunk.ViewSize)
}
return &ChunkStreamReader{
- chunkViews: chunkViews,
+ chunkView: chunkViews.Front(),
lookupFileId: lookupFileIdFn,
totalSize: totalSize,
}
@@ -290,7 +290,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
}
func insideChunk(offset int64, chunk *ChunkView) bool {
- return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
+ return chunk.ViewOffset <= offset && offset < chunk.ViewOffset+int64(chunk.ViewSize)
}
func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
@@ -300,48 +300,22 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
}
// fmt.Printf("fetch for offset %d\n", offset)
-
- // need to seek to a different chunk
- currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
- return offset < c.chunkViews[i].LogicOffset
- })
- if currentChunkIndex == len(c.chunkViews) {
- // not found
- if insideChunk(offset, c.chunkViews[0]) {
- // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
- currentChunkIndex = 0
- } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
- currentChunkIndex = len(c.chunkViews) - 1
- // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
- } else {
- return io.EOF
- }
- } else if currentChunkIndex > 0 {
- if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
- // good hit
- } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
- currentChunkIndex -= 1
- // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
- } else {
- // glog.Fatalf("unexpected1 offset %d", offset)
- return fmt.Errorf("unexpected1 offset %d", offset)
- }
- } else {
- // glog.Fatalf("unexpected2 offset %d", offset)
- return fmt.Errorf("unexpected2 offset %d", offset)
+ c.chunkView = c.chunkView.Next
+ if c.chunkView == nil {
+ return io.EOF
}
// positioning within the new chunk
- chunk := c.chunkViews[currentChunkIndex]
+ chunk := c.chunkView.Value
if insideChunk(offset, chunk) {
- if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
+ if c.isBufferEmpty() || c.bufferOffset != chunk.ViewOffset {
if err = c.fetchChunkToBuffer(chunk); err != nil {
return
}
}
} else {
- // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
- return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
+ // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
+ return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
}
return
}
@@ -355,7 +329,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
@@ -372,10 +346,10 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
c.buffer = buffer.Bytes()
- c.bufferOffset = chunkView.LogicOffset
+ c.bufferOffset = chunkView.ViewOffset
c.chunk = chunkView.FileId
- // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+ // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize))
return nil
}
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
index 78e7b7877..56c97549f 100644
--- a/weed/mount/dirty_pages_chunked.go
+++ b/weed/mount/dirty_pages_chunked.go
@@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
- "time"
)
type ChunkedDirtyPages struct {
@@ -38,11 +37,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
return dirtyPages
}
-func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) {
+func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
- pages.uploadPipeline.SaveDataAt(data, offset, isSequential)
+ pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs)
return
}
@@ -58,28 +57,27 @@ func (pages *ChunkedDirtyPages) FlushData() error {
return nil
}
-func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) {
if !pages.hasWrites {
return
}
- return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
+ return pages.uploadPipeline.MaybeReadDataAt(data, startOffset, tsNs)
}
-func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
+func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) {
- mtime := time.Now().UnixNano()
defer cleanupFn()
fileFullPath := pages.fh.FullPath()
fileName := fileFullPath.Name()
- chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
+ chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs)
if err != nil {
glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
pages.lastErr = err
return
}
- chunk.ModifiedTsNs = mtime
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
+ pages.fh.entryChunkGroup.AddChunk(chunk)
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
}
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index b6ec3d2da..67298b047 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -5,50 +5,60 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "golang.org/x/exp/slices"
- "golang.org/x/sync/semaphore"
- "math"
+ "os"
"sync"
)
type FileHandleId uint64
+var IsDebugFileReadWrite = false
+
type FileHandle struct {
- fh FileHandleId
- counter int64
- entry *LockedEntry
- entryLock sync.Mutex
- inode uint64
- wfs *WFS
+ fh FileHandleId
+ counter int64
+ entry *LockedEntry
+ entryLock sync.Mutex
+ entryChunkGroup *filer.ChunkGroup
+ inode uint64
+ wfs *WFS
// cache file has been written to
- dirtyMetadata bool
- dirtyPages *PageWriter
- entryViewCache []filer.VisibleInterval
- reader *filer.ChunkReadAt
- contentType string
- handle uint64
- orderedMutex *semaphore.Weighted
+ dirtyMetadata bool
+ dirtyPages *PageWriter
+ reader *filer.ChunkReadAt
+ contentType string
+ handle uint64
+ sync.Mutex
isDeleted bool
+
+ // for debugging
+ mirrorFile *os.File
}
func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
fh := &FileHandle{
- fh: handleId,
- counter: 1,
- inode: inode,
- wfs: wfs,
- orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
+ fh: handleId,
+ counter: 1,
+ inode: inode,
+ wfs: wfs,
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
- if entry != nil {
- entry.Attributes.FileSize = filer.FileSize(entry)
- }
fh.entry = &LockedEntry{
Entry: entry,
}
+ if entry != nil {
+ fh.SetEntry(entry)
+ }
+
+ if IsDebugFileReadWrite {
+ var err error
+ fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
+ if err != nil {
+ println("failed to create mirror:", err.Error())
+ }
+ }
return fh
}
@@ -63,6 +73,17 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry {
}
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
+ if entry != nil {
+ fileSize := filer.FileSize(entry)
+ entry.Attributes.FileSize = fileSize
+ var resolveManifestErr error
+ fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
+ if resolveManifestErr != nil {
+ glog.Warningf("failed to resolve manifest chunks in %+v", entry)
+ }
+ } else {
+ glog.Fatalf("setting file handle entry to nil")
+ }
fh.entry.SetEntry(entry)
}
@@ -78,43 +99,17 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
return
}
- // find the earliest incoming chunk
- newChunks := chunks
- earliestChunk := newChunks[0]
- for i := 1; i < len(newChunks); i++ {
- if lessThan(earliestChunk, newChunks[i]) {
- earliestChunk = newChunks[i]
- }
- }
-
- // pick out-of-order chunks from existing chunks
- for _, chunk := range fh.entry.GetChunks() {
- if lessThan(earliestChunk, chunk) {
- chunks = append(chunks, chunk)
- }
- }
-
- // sort incoming chunks
- slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
- return lessThan(a, b)
- })
-
- glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))
-
- fh.entry.AppendChunks(newChunks)
- fh.entryViewCache = nil
+ fh.entry.AppendChunks(chunks)
}
-func (fh *FileHandle) CloseReader() {
- if fh.reader != nil {
- _ = fh.reader.Close()
- fh.reader = nil
- }
-}
+func (fh *FileHandle) ReleaseHandle() {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
-func (fh *FileHandle) Release() {
fh.dirtyPages.Destroy()
- fh.CloseReader()
+ if IsDebugFileReadWrite {
+ fh.mirrorFile.Close()
+ }
}
func lessThan(a, b *filer_pb.FileChunk) bool {
diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go
index cc5885ffc..f0051f061 100644
--- a/weed/mount/filehandle_map.go
+++ b/weed/mount/filehandle_map.go
@@ -65,7 +65,7 @@ func (i *FileHandleToInode) ReleaseByInode(inode uint64) {
if fh.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fh.fh)
- fh.Release()
+ fh.ReleaseHandle()
}
}
}
@@ -82,7 +82,7 @@ func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) {
if fhHandle.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fhHandle.fh)
- fhHandle.Release()
+ fhHandle.ReleaseHandle()
}
}
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index a316a16cd..be6d5d984 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -17,18 +17,20 @@ func (fh *FileHandle) unlockForRead(startOffset int64, size int) {
fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size))
}
-func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
- maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs int64) (maxStop int64) {
+ maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset, tsNs)
return
}
-func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
+func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
fileFullPath := fh.FullPath()
entry := fh.GetEntry()
if entry == nil {
- return 0, io.EOF
+ return 0, 0, io.EOF
}
if entry.IsInRemoteOnly() {
@@ -36,43 +38,28 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
newEntry, err := fh.downloadRemoteEntry(entry)
if err != nil {
glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err)
- return 0, err
+ return 0, 0, err
}
entry = newEntry
}
- fileSize := int64(filer.FileSize(entry))
+ fileSize := int64(entry.Attributes.FileSize)
+ if fileSize == 0 {
+ fileSize = int64(filer.FileSize(entry))
+ }
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath)
- return 0, io.EOF
+ return 0, 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
- return int64(totalRead), nil
- }
-
- var chunkResolveErr error
- if fh.entryViewCache == nil {
- fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize)
- if chunkResolveErr != nil {
- return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
- }
- fh.CloseReader()
- }
-
- if fh.reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize)
- glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews))
- for _, chunkView := range chunkViews {
- glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId)
- }
- fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize)
+ return int64(totalRead), 0, nil
}
- totalRead, err := fh.reader.ReadAt(buff, offset)
+ totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
@@ -80,7 +67,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
- return int64(totalRead), err
+ return int64(totalRead), ts, err
}
func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go
index 1f31b5300..c9470c440 100644
--- a/weed/mount/page_writer.go
+++ b/weed/mount/page_writer.go
@@ -29,35 +29,35 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
return pw
}
-func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) {
+func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- pw.addToOneChunk(i, offset, data[:writeSize], isSequential)
+ pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs)
offset += writeSize
data = data[writeSize:]
}
}
-func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) {
- pw.randomWriter.AddPage(offset, data, isSequential)
+func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) {
+ pw.randomWriter.AddPage(offset, data, isSequential, tsNs)
}
func (pw *PageWriter) FlushData() error {
return pw.randomWriter.FlushData()
}
-func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
+func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64, tsNs int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
+ maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset, tsNs)
offset += readSize
data = data[readSize:]
diff --git a/weed/mount/page_writer/activity_score.go b/weed/mount/page_writer/activity_score.go
new file mode 100644
index 000000000..22da87e37
--- /dev/null
+++ b/weed/mount/page_writer/activity_score.go
@@ -0,0 +1,39 @@
+package page_writer
+
+import "time"
+
+type ActivityScore struct {
+ lastActiveTsNs int64
+ decayedActivenessScore int64
+}
+
+func NewActivityScore() *ActivityScore {
+ return &ActivityScore{}
+}
+
+func (as ActivityScore) MarkRead() {
+ now := time.Now().UnixNano()
+ deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
+ as.lastActiveTsNs = now
+
+ as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 256
+ if as.decayedActivenessScore < 0 {
+ as.decayedActivenessScore = 0
+ }
+}
+
+func (as ActivityScore) MarkWrite() {
+ now := time.Now().UnixNano()
+ deltaTime := (now - as.lastActiveTsNs) >> 30 // about number of seconds
+ as.lastActiveTsNs = now
+
+ as.decayedActivenessScore = as.decayedActivenessScore>>deltaTime + 1024
+ if as.decayedActivenessScore < 0 {
+ as.decayedActivenessScore = 0
+ }
+}
+
+func (as ActivityScore) ActivityScore() int64 {
+ deltaTime := (time.Now().UnixNano() - as.lastActiveTsNs) >> 30 // about number of seconds
+ return as.decayedActivenessScore >> deltaTime
+}
diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go
index a9d64c8e4..005385c1a 100644
--- a/weed/mount/page_writer/chunk_interval_list.go
+++ b/weed/mount/page_writer/chunk_interval_list.go
@@ -8,6 +8,7 @@ import (
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
+ TsNs int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
@@ -42,10 +43,14 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
return list
}
-func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
+func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset, tsNs int64) {
+ if startOffset >= stopOffset {
+ return
+ }
interval := &ChunkWrittenInterval{
StartOffset: startOffset,
stopOffset: stopOffset,
+ TsNs: tsNs,
}
list.addInterval(interval)
}
@@ -62,50 +67,54 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
+ //t := list.head
+ //for ; t.next != nil; t = t.next {
+ // if t.TsNs > interval.TsNs {
+ // println("writes is out of order", t.TsNs-interval.TsNs, "ns")
+ // }
+ //}
+
p := list.head
- for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
+ for ; p.next != nil && p.next.stopOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
- for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
+ for ; q.prev != nil && q.prev.StartOffset >= interval.stopOffset; q = q.prev {
}
- if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
- // merge p and q together
- p.stopOffset = q.stopOffset
- unlinkNodesBetween(p, q.next)
- return
+ // left side
+ // interval after p.next start
+ if p.next.StartOffset < interval.StartOffset {
+ t := &ChunkWrittenInterval{
+ StartOffset: p.next.StartOffset,
+ stopOffset: interval.StartOffset,
+ TsNs: p.next.TsNs,
+ }
+ p.next = t
+ t.prev = p
+ t.next = interval
+ interval.prev = t
+ } else {
+ p.next = interval
+ interval.prev = p
}
- if interval.StartOffset <= p.stopOffset {
- // merge new interval into p
- p.stopOffset = interval.stopOffset
- unlinkNodesBetween(p, q)
- return
- }
- if q.StartOffset <= interval.stopOffset {
- // merge new interval into q
- q.StartOffset = interval.StartOffset
- unlinkNodesBetween(p, q)
- return
- }
-
- // add the new interval between p and q
- unlinkNodesBetween(p, q)
- p.next = interval
- interval.prev = p
- q.prev = interval
- interval.next = q
-
-}
-// unlinkNodesBetween remove all nodes after start and before stop, exclusive
-func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
- if start.next == stop {
- return
+ // right side
+ // interval ends before p.prev
+ if interval.stopOffset < q.prev.stopOffset {
+ t := &ChunkWrittenInterval{
+ StartOffset: interval.stopOffset,
+ stopOffset: q.prev.stopOffset,
+ TsNs: q.prev.TsNs,
+ }
+ q.prev = t
+ t.next = q
+ interval.next = t
+ t.prev = interval
+ } else {
+ q.prev = interval
+ interval.next = q
}
- start.next.prev = nil
- start.next = stop
- stop.prev.next = nil
- stop.prev = start
+
}
func (list *ChunkWrittenIntervalList) size() int {
diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go
index b22f5eb5d..eb1d5ff46 100644
--- a/weed/mount/page_writer/chunk_interval_list_test.go
+++ b/weed/mount/page_writer/chunk_interval_list_test.go
@@ -10,40 +10,72 @@ func Test_PageChunkWrittenIntervalList(t *testing.T) {
assert.Equal(t, 0, list.size(), "empty list")
- list.MarkWritten(0, 5)
+ list.MarkWritten(0, 5, 1)
assert.Equal(t, 1, list.size(), "one interval")
- list.MarkWritten(0, 5)
+ list.MarkWritten(0, 5, 2)
assert.Equal(t, 1, list.size(), "duplicated interval2")
- list.MarkWritten(95, 100)
+ list.MarkWritten(95, 100, 3)
assert.Equal(t, 2, list.size(), "two intervals")
- list.MarkWritten(50, 60)
+ list.MarkWritten(50, 60, 4)
assert.Equal(t, 3, list.size(), "three intervals")
- list.MarkWritten(50, 55)
- assert.Equal(t, 3, list.size(), "three intervals merge")
+ list.MarkWritten(50, 55, 5)
+ assert.Equal(t, 4, list.size(), "three intervals merge")
- list.MarkWritten(40, 50)
- assert.Equal(t, 3, list.size(), "three intervals grow forward")
+ list.MarkWritten(40, 50, 6)
+ assert.Equal(t, 5, list.size(), "three intervals grow forward")
- list.MarkWritten(50, 65)
- assert.Equal(t, 3, list.size(), "three intervals grow backward")
+ list.MarkWritten(50, 65, 7)
+ assert.Equal(t, 4, list.size(), "three intervals grow backward")
- list.MarkWritten(70, 80)
- assert.Equal(t, 4, list.size(), "four intervals")
+ list.MarkWritten(70, 80, 8)
+ assert.Equal(t, 5, list.size(), "four intervals")
- list.MarkWritten(60, 70)
- assert.Equal(t, 3, list.size(), "three intervals merged")
+ list.MarkWritten(60, 70, 9)
+ assert.Equal(t, 6, list.size(), "three intervals merged")
- list.MarkWritten(59, 71)
- assert.Equal(t, 3, list.size(), "covered three intervals")
+ list.MarkWritten(59, 71, 10)
+ assert.Equal(t, 6, list.size(), "covered three intervals")
- list.MarkWritten(5, 59)
- assert.Equal(t, 2, list.size(), "covered two intervals")
+ list.MarkWritten(5, 59, 11)
+ assert.Equal(t, 5, list.size(), "covered two intervals")
- list.MarkWritten(70, 99)
- assert.Equal(t, 1, list.size(), "covered one intervals")
+ list.MarkWritten(70, 99, 12)
+ assert.Equal(t, 5, list.size(), "covered one intervals")
}
+
+type interval struct {
+ start int64
+ stop int64
+ expected bool
+}
+
+func Test_PageChunkWrittenIntervalList1(t *testing.T) {
+ list := newChunkWrittenIntervalList()
+ inputs := []interval{
+ {1, 5, true},
+ {2, 3, true},
+ }
+ for i, input := range inputs {
+ list.MarkWritten(input.start, input.stop, int64(i)+1)
+ actual := hasData(list, 0, 4)
+ if actual != input.expected {
+ t.Errorf("input [%d,%d) expected %v actual %v", input.start, input.stop, input.expected, actual)
+ }
+ }
+}
+
+func hasData(usage *ChunkWrittenIntervalList, chunkStartOffset, x int64) bool {
+ for t := usage.head.next; t != usage.tail; t = t.next {
+ logicStart := chunkStartOffset + t.StartOffset
+ logicStop := chunkStartOffset + t.stopOffset
+ if logicStart <= x && x < logicStop {
+ return true
+ }
+ }
+ return false
+}
diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go
index 44f879afc..7cddcf69e 100644
--- a/weed/mount/page_writer/dirty_pages.go
+++ b/weed/mount/page_writer/dirty_pages.go
@@ -1,9 +1,9 @@
package page_writer
type DirtyPages interface {
- AddPage(offset int64, data []byte, isSequential bool)
+ AddPage(offset int64, data []byte, isSequential bool, tsNs int64)
FlushData() error
- ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
+ ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64)
Destroy()
LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64)
diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go
index 4e8f31425..32d246deb 100644
--- a/weed/mount/page_writer/page_chunk.go
+++ b/weed/mount/page_writer/page_chunk.go
@@ -4,13 +4,13 @@ import (
"io"
)
-type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
+type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func())
type PageChunk interface {
FreeResource()
- WriteDataAt(src []byte, offset int64) (n int)
- ReadDataAt(p []byte, off int64) (maxStop int64)
+ WriteDataAt(src []byte, offset int64, tsNs int64) (n int)
+ ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
IsComplete() bool
- WrittenSize() int64
+ ActivityScore() int64
SaveContent(saveFn SaveToStorageFunc)
}
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
index 8cccded67..1ec8cecb4 100644
--- a/weed/mount/page_writer/page_chunk_mem.go
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -19,6 +19,7 @@ type MemChunk struct {
usage *ChunkWrittenIntervalList
chunkSize int64
logicChunkIndex LogicChunkIndex
+ activityScore *ActivityScore
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
@@ -28,6 +29,7 @@ func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
chunkSize: chunkSize,
buf: mem.Allocate(int(chunkSize)),
usage: newChunkWrittenIntervalList(),
+ activityScore: NewActivityScore(),
}
}
@@ -39,29 +41,37 @@ func (mc *MemChunk) FreeResource() {
mem.Free(mc.buf)
}
-func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
+func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
mc.Lock()
defer mc.Unlock()
innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src)
- mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
+ mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
+ mc.activityScore.MarkWrite()
+
return
}
-func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
mc.RLock()
defer mc.RUnlock()
memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
- logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
+ logicStart := max(off, memChunkBaseOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
- copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
- maxStop = max(maxStop, logicStop)
+ if t.TsNs >= tsNs {
+ copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
+ maxStop = max(maxStop, logicStop)
+ } else {
+ println("read old data1", tsNs-t.TsNs, "ns")
+ }
}
}
+ mc.activityScore.MarkRead()
+
return
}
@@ -72,11 +82,8 @@ func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize)
}
-func (mc *MemChunk) WrittenSize() int64 {
- mc.RLock()
- defer mc.RUnlock()
-
- return mc.usage.WrittenSize()
+func (mc *MemChunk) ActivityScore() int64 {
+ return mc.activityScore.ActivityScore()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
@@ -88,7 +95,7 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
}
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
- saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
+ saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() {
})
}
}
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index bf2cdb256..6cedc64df 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -15,12 +15,12 @@ var (
type ActualChunkIndex int
type SwapFile struct {
- dir string
- file *os.File
- logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
- logicToActualChunkIndexLock sync.Mutex
- chunkSize int64
- freeActualChunkList []ActualChunkIndex
+ dir string
+ file *os.File
+ chunkSize int64
+ chunkTrackingLock sync.Mutex
+ activeChunkCount int
+ freeActualChunkList []ActualChunkIndex
}
type SwapFileChunk struct {
@@ -29,14 +29,15 @@ type SwapFileChunk struct {
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
+ activityScore *ActivityScore
+ //memChunk *MemChunk
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
- dir: dir,
- file: nil,
- logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
- chunkSize: chunkSize,
+ dir: dir,
+ file: nil,
+ chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
@@ -46,7 +47,7 @@ func (sf *SwapFile) FreeResource() {
}
}
-func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
+func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
@@ -55,70 +56,98 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
return nil
}
}
- sf.logicToActualChunkIndexLock.Lock()
- defer sf.logicToActualChunkIndexLock.Unlock()
- actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
- if !found {
- if len(sf.freeActualChunkList) > 0 {
- actualChunkIndex = sf.freeActualChunkList[0]
- sf.freeActualChunkList = sf.freeActualChunkList[1:]
- } else {
- actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
- }
- sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
+ sf.chunkTrackingLock.Lock()
+ defer sf.chunkTrackingLock.Unlock()
+
+ sf.activeChunkCount++
+
+ // assign a new physical chunk
+ var actualChunkIndex ActualChunkIndex
+ if len(sf.freeActualChunkList) > 0 {
+ actualChunkIndex = sf.freeActualChunkList[0]
+ sf.freeActualChunkList = sf.freeActualChunkList[1:]
+ } else {
+ actualChunkIndex = ActualChunkIndex(sf.activeChunkCount)
}
- return &SwapFileChunk{
+ swapFileChunk := &SwapFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
+ activityScore: NewActivityScore(),
+ // memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize),
}
+
+ // println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf)
+ return swapFileChunk
}
func (sc *SwapFileChunk) FreeResource() {
- sc.swapfile.logicToActualChunkIndexLock.Lock()
- defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
sc.Lock()
defer sc.Unlock()
+ sc.swapfile.chunkTrackingLock.Lock()
+ defer sc.swapfile.chunkTrackingLock.Unlock()
+
sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
- delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
+ sc.swapfile.activeChunkCount--
+ // println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile)
}
-func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
+func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
sc.Lock()
defer sc.Unlock()
+ // println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex)
+
innerOffset := offset % sc.swapfile.chunkSize
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
- if err == nil {
- sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
- } else {
+ sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
+ if err != nil {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
+ //sc.memChunk.WriteDataAt(src, offset, tsNs)
+ sc.activityScore.MarkWrite()
+
return
}
-func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
sc.RLock()
defer sc.RUnlock()
+ // println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex)
+
+ //memCopy := make([]byte, len(p))
+ //copy(memCopy, p)
+
chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
- actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
- if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
- glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
- break
+ if t.TsNs >= tsNs {
+ actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
+ if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
+ glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
+ break
+ }
+ maxStop = max(maxStop, logicStop)
+ } else {
+ println("read old data2", tsNs-t.TsNs, "ns")
}
- maxStop = max(maxStop, logicStop)
}
}
+ //sc.memChunk.ReadDataAt(memCopy, off, tsNs)
+ //if bytes.Compare(memCopy, p) != 0 {
+ // println("read wrong data from swap file", off, sc.logicChunkIndex)
+ //}
+
+ sc.activityScore.MarkRead()
+
return
}
@@ -128,27 +157,27 @@ func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
-func (sc *SwapFileChunk) WrittenSize() int64 {
- sc.RLock()
- defer sc.RUnlock()
- return sc.usage.WrittenSize()
+func (sc *SwapFileChunk) ActivityScore() int64 {
+ return sc.activityScore.ActivityScore()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
+ sc.RLock()
+ defer sc.RUnlock()
+
if saveFn == nil {
return
}
- sc.Lock()
- defer sc.Unlock()
-
+ // println(sc.logicChunkIndex, "|", "save")
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
- sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
- reader := util.NewBytesReader(data)
- saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
- })
+ n, _ := sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
+ if n > 0 {
+ reader := util.NewBytesReader(data[:n])
+ saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, int64(n), t.TsNs, func() {
+ })
+ }
mem.Free(data)
}
- sc.usage = newChunkWrittenIntervalList()
}
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 252dddc06..6065f2f76 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "math"
"sync"
"sync/atomic"
)
@@ -55,7 +56,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
return t
}
-func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
+func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) {
+
up.chunksLock.Lock()
defer up.chunksLock.Unlock()
@@ -65,33 +67,39 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
if !found {
if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit
- fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
- for lci, mc := range up.writableChunks {
- chunkFullness := mc.WrittenSize()
- if fullness < chunkFullness {
- fullestChunkIndex = lci
- fullness = chunkFullness
+ laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
+ for wci, wc := range up.writableChunks {
+ activityScore := wc.ActivityScore()
+ if lowestActivityScore > activityScore {
+ laziestChunkIndex = wci
+ lowestActivityScore = activityScore
}
}
- up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
- // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
+ up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex)
+ // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
}
if isSequential &&
len(up.writableChunks) < up.writableChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
- pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
+ pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex)
}
up.writableChunks[logicChunkIndex] = pageChunk
}
- n = pageChunk.WriteDataAt(p, off)
+ //if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed {
+ // println("found already sealed chunk", logicChunkIndex)
+ //}
+ //if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading {
+ // println("found active read chunk", logicChunkIndex)
+ //}
+ n = pageChunk.WriteDataAt(p, off, tsNs)
up.maybeMoveToSealed(pageChunk, logicChunkIndex)
return
}
-func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
+func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
up.chunksLock.Lock()
@@ -103,12 +111,8 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
// read from sealed chunks first
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found {
- sealedChunk.referenceCounter++
- }
- if found {
- maxStop = sealedChunk.chunk.ReadDataAt(p, off)
+ maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
- sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
}
// read from writable chunks last
@@ -116,7 +120,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
if !found {
return
}
- writableMaxStop := writableChunk.ReadDataAt(p, off)
+ writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)
diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go
index 27da7036d..2d803f6af 100644
--- a/weed/mount/page_writer/upload_pipeline_test.go
+++ b/weed/mount/page_writer/upload_pipeline_test.go
@@ -31,14 +31,14 @@ func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
- uploadPipeline.SaveDataAt(p, i, false)
+ uploadPipeline.SaveDataAt(p, i, false, 0)
}
}
func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff; i < stopOff/4; i += 4 {
- uploadPipeline.MaybeReadDataAt(p, i)
+ uploadPipeline.MaybeReadDataAt(p, i, 0)
x := util.BytesToUint32(p)
if x != uint32(i) {
t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go
index 1d58e0852..7dc3c6b50 100644
--- a/weed/mount/weedfs_attr.go
+++ b/weed/mount/weedfs_attr.go
@@ -20,12 +20,12 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
_, _, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK {
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, inode, entry)
+ wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
return status
} else {
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry())
+ wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true)
out.Nlink = 0
return fuse.OK
}
@@ -75,7 +75,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
// set the new chunks and reset entry cache
entry.Chunks = chunks
if fh != nil {
- fh.entryViewCache = nil
+ fh.entryChunkGroup.SetChunks(chunks)
}
}
entry.Attributes.Mtime = time.Now().Unix()
@@ -114,7 +114,11 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
}
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry)
+ size, includeSize := input.GetSize()
+ if includeSize {
+ out.Attr.Size = size
+ }
+ wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry, !includeSize)
if fh != nil {
fh.dirtyMetadata = true
@@ -139,12 +143,14 @@ func (wfs *WFS) setRootAttr(out *fuse.AttrOut) {
out.Nlink = 1
}
-func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry) {
+func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry, calculateSize bool) {
out.Ino = inode
if entry.Attributes != nil && entry.Attributes.Inode != 0 {
out.Ino = entry.Attributes.Inode
}
- out.Size = filer.FileSize(entry)
+ if calculateSize {
+ out.Size = filer.FileSize(entry)
+ }
if entry.FileMode()&os.ModeSymlink != 0 {
out.Size = uint64(len(entry.Attributes.SymlinkTarget))
}
@@ -194,7 +200,7 @@ func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb.
out.Generation = 1
out.EntryValid = 1
out.AttrValid = 1
- wfs.setAttrByPbEntry(&out.Attr, inode, entry)
+ wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
}
func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) {
diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go
index bc092a252..e3f841b02 100644
--- a/weed/mount/weedfs_file_copy_range.go
+++ b/weed/mount/weedfs_file_copy_range.go
@@ -1,8 +1,8 @@
package mount
import (
- "context"
"net/http"
+ "time"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -44,8 +44,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
// lock source and target file handles
- fhOut.orderedMutex.Acquire(context.Background(), 1)
- defer fhOut.orderedMutex.Release(1)
+ fhOut.Lock()
+ defer fhOut.Unlock()
fhOut.entryLock.Lock()
defer fhOut.entryLock.Unlock()
@@ -54,8 +54,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
if fhIn.fh != fhOut.fh {
- fhIn.orderedMutex.Acquire(context.Background(), 1)
- defer fhIn.orderedMutex.Release(1)
+ fhIn.Lock()
+ defer fhIn.Unlock()
fhIn.entryLock.Lock()
defer fhIn.entryLock.Unlock()
}
@@ -88,7 +88,7 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
// put data at the specified offset in target file
fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len))
fhOut.entry.Content = nil
- fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode())
+ fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano())
fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize)))
fhOut.dirtyMetadata = true
written = uint32(totalRead)
diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go
index 9d6402f96..93fc65247 100644
--- a/weed/mount/weedfs_file_lseek.go
+++ b/weed/mount/weedfs_file_lseek.go
@@ -1,7 +1,6 @@
package mount
import (
- "context"
"syscall"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -36,8 +35,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
}
// lock the file until the proper offset was calculated
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ fh.Lock()
+ defer fh.Unlock()
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
@@ -56,17 +55,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return ENXIO
}
- // refresh view cache if necessary
- if fh.entryViewCache == nil {
- var err error
- fh.entryViewCache, err = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), fh.entry.GetChunks(), 0, fileSize)
- if err != nil {
- return fuse.EIO
- }
- }
-
// search chunks for the offset
- found, offset := searchChunks(fh, offset, fileSize, in.Whence)
+ found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence)
if found {
out.Offset = uint64(offset)
return fuse.OK
@@ -82,30 +72,3 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return fuse.OK
}
-
-// searchChunks goes through all chunks to find the correct offset
-func searchChunks(fh *FileHandle, offset, fileSize int64, whence uint32) (found bool, out int64) {
- chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, offset, fileSize)
-
- for _, chunkView := range chunkViews {
- if offset < chunkView.LogicOffset {
- if whence == SEEK_HOLE {
- out = offset
- } else {
- out = chunkView.LogicOffset
- }
-
- return true, out
- }
-
- if offset >= chunkView.LogicOffset && offset < chunkView.Offset+int64(chunkView.Size) && whence == SEEK_DATA {
- out = offset
-
- return true, out
- }
-
- offset += int64(chunkView.Size)
- }
-
- return
-}
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index 8375f9a5d..cedece137 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -1,7 +1,8 @@
package mount
import (
- "context"
+ "bytes"
+ "fmt"
"io"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -40,8 +41,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ fh.Lock()
+ defer fh.Unlock()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)
@@ -50,6 +51,23 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.EIO
}
+ if IsDebugFileReadWrite {
+ // print(".")
+ mirrorData := make([]byte, totalRead)
+ fh.mirrorFile.ReadAt(mirrorData, offset)
+ if bytes.Compare(mirrorData, buff[:totalRead]) != 0 {
+
+ againBuff := make([]byte, len(buff))
+ againRead, _ := readDataByFileHandle(buff, fh, offset)
+ againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0
+ againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0
+
+ fmt.Printf("\ncompare %v [%d,%d) size:%d againSame:%v againCorrect:%v\n", fh.mirrorFile.Name(), offset, offset+totalRead, totalRead, againSame, againCorrect)
+ //fmt.Printf("read mirrow data: %v\n", mirrorData)
+ //fmt.Printf("read actual data: %v\n", buff[:totalRead])
+ }
+ }
+
return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}
@@ -59,9 +77,9 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
- n, err := fhIn.readFromChunks(buff, offset)
+ n, tsNs, err := fhIn.readFromChunks(buff, offset)
if err == nil || err == io.EOF {
- maxStop := fhIn.readFromDirtyPages(buff, offset)
+ maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
n = max(maxStop-offset, n)
}
if err == io.EOF {
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 7b7c66680..ac18e05ea 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -89,8 +89,8 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
}
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ fh.Lock()
+ defer fh.Unlock()
// flush works at fh level
fileFullPath := fh.FullPath()
@@ -145,9 +145,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
}
glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.GetChunks()))
- for i, chunk := range entry.GetChunks() {
- glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
+ //for i, chunk := range entry.GetChunks() {
+ // glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
+ //}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks())
@@ -158,6 +158,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
entry.Chunks = append(chunks, manifestChunks...)
+ fh.entryChunkGroup.SetChunks(entry.Chunks)
wfs.mapPbIdFromLocalToFiler(request.Entry)
defer wfs.mapPbIdFromFilerToLocal(request.Entry)
@@ -181,5 +182,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
return fuse.EIO
}
+ if IsDebugFileReadWrite {
+ fh.mirrorFile.Sync()
+ }
+
return fuse.OK
}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index 7b13d54ff..5a9a21ded 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -1,10 +1,10 @@
package mount
import (
- "context"
"github.com/hanwen/go-fuse/v2/fuse"
"net/http"
"syscall"
+ "time"
)
/**
@@ -46,8 +46,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
- fh.orderedMutex.Acquire(context.Background(), 1)
- defer fh.orderedMutex.Release(1)
+ tsNs := time.Now().UnixNano()
+
+ fh.Lock()
+ defer fh.Unlock()
entry := fh.GetEntry()
if entry == nil {
@@ -59,7 +61,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
- fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode())
+ fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs)
written = uint32(len(data))
@@ -70,5 +72,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyMetadata = true
+ if IsDebugFileReadWrite {
+ // print("+")
+ fh.mirrorFile.WriteAt(data, offset)
+ }
+
return written, fuse.OK
}
diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go
index e18a4a358..4c8470245 100644
--- a/weed/mount/weedfs_write.go
+++ b/weed/mount/weedfs_write.go
@@ -13,7 +13,7 @@ import (
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
- return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, err error) {
+ return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
fileId, uploadResult, err, data := operation.UploadWithRetry(
wfs,
@@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
wfs.chunkCache.SetChunk(fileId, data)
}
- chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs)
return chunk, nil
}
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index ed38dfa6b..0c3e29a43 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -45,13 +45,13 @@ type UploadResult struct {
RetryCount int `json:"-"`
}
-func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
+func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk {
fid, _ := filer_pb.ToFileIdObject(fileId)
return &filer_pb.FileChunk{
FileId: fileId,
Offset: offset,
Size: uint64(uploadResult.Size),
- ModifiedTsNs: time.Now().UnixNano(),
+ ModifiedTsNs: tsNs,
ETag: uploadResult.ContentMd5,
CipherKey: uploadResult.CipherKey,
IsCompressed: uploadResult.Gzip > 0,
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go
index ec0e80b2e..9682ca623 100644
--- a/weed/replication/repl_util/replication_util.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -7,9 +7,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
-func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
+func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
- for _, chunk := range chunkViews {
+ for x := chunkViews.Front(); x != nil; x = x.Next {
+ chunk := x.Value
fileUrls, err := filerSource.LookupFileId(chunk.FileId)
if err != nil {
@@ -20,7 +21,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index c671deb76..8b3fc45fb 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -256,7 +256,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
- return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, error) {
+ return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) {
var fileId string
var uploadResult *operation.UploadResult
@@ -290,7 +290,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
return nil, err
}
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
}
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index bd8761077..bb5659437 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
}
// Save to chunk manifest structure
- fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)}
+ fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0, time.Now().UnixNano())}
// fmt.Printf("uploaded: %+v\n", uploadResult)
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 95920583d..cc43eba64 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -214,5 +214,5 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
if uploadResult.Size == 0 {
return nil, nil
}
- return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset)}, nil
+ return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset, time.Now().UnixNano())}, nil
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 80b882181..79416d519 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -102,14 +102,14 @@ func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
func (fi *FileInfo) Sys() interface{} { return nil }
type WebDavFile struct {
- fs *WebDavFileSystem
- name string
- isDirectory bool
- off int64
- entry *filer_pb.Entry
- entryViewCache []filer.VisibleInterval
- reader io.ReaderAt
- bufWriter *buffered_writer.BufferedWriteCloser
+ fs *WebDavFileSystem
+ name string
+ isDirectory bool
+ off int64
+ entry *filer_pb.Entry
+ visibleIntervals *filer.IntervalList[*filer.VisibleInterval]
+ reader io.ReaderAt
+ bufWriter *buffered_writer.BufferedWriteCloser
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
@@ -381,7 +381,7 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name)
}
-func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
+func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
f.fs,
@@ -413,7 +413,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
}
func (f *WebDavFile) Write(buf []byte) (int, error) {
@@ -439,7 +439,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
var chunk *filer_pb.FileChunk
- chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset)
+ chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano())
if flushErr != nil {
return fmt.Errorf("%s upload result: %v", f.name, flushErr)
@@ -498,7 +498,7 @@ func (f *WebDavFile) Close() error {
if f.entry != nil {
f.entry = nil
- f.entryViewCache = nil
+ f.visibleIntervals = nil
}
return err
@@ -521,12 +521,12 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
if fileSize == 0 {
return 0, io.EOF
}
- if f.entryViewCache == nil {
- f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
+ if f.visibleIntervals == nil {
+ f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.reader = nil
}
if f.reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
+ chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
}
diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go
index 11cc64d78..07f3fd9c1 100644
--- a/weed/shell/command_fs_verify.go
+++ b/weed/shell/command_fs_verify.go
@@ -117,7 +117,7 @@ type ItemEntry struct {
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) {
timeNowAtSec := time.Now().Unix()
- return fileCount, errCount, doTraverseBfsAndSaving(c.env, nil, path, false,
+ return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if c.modifyTimeAgoAtSec > 0 {
if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 559b11cd3..be9bd2db2 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -211,7 +211,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
}
}()
- return doTraverseBfsAndSaving(c.env, nil, c.getCollectFilerFilePath(), false,
+ return doTraverseBfsAndSaving(c.env, c.writer, c.getCollectFilerFilePath(), false,
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if *c.verbose && entry.Entry.IsDirectory {
fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))