diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filechunk_group.go | 148 | ||||
| -rw-r--r-- | weed/filer/filechunk_group_test.go | 36 | ||||
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 4 | ||||
| -rw-r--r-- | weed/filer/filechunk_section.go | 119 | ||||
| -rw-r--r-- | weed/filer/filechunks.go | 248 | ||||
| -rw-r--r-- | weed/filer/filechunks_read.go | 106 | ||||
| -rw-r--r-- | weed/filer/filechunks_read_test.go | 86 | ||||
| -rw-r--r-- | weed/filer/filechunks_test.go | 214 | ||||
| -rw-r--r-- | weed/filer/filer_notify_append.go | 2 | ||||
| -rw-r--r-- | weed/filer/interval_list.go | 259 | ||||
| -rw-r--r-- | weed/filer/interval_list_test.go | 327 | ||||
| -rw-r--r-- | weed/filer/reader_at.go | 65 | ||||
| -rw-r--r-- | weed/filer/reader_at_test.go | 142 | ||||
| -rw-r--r-- | weed/filer/reader_cache.go | 7 | ||||
| -rw-r--r-- | weed/filer/stream.go | 90 |
15 files changed, 1391 insertions, 462 deletions
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go new file mode 100644 index 000000000..5dbf16a5c --- /dev/null +++ b/weed/filer/filechunk_group.go @@ -0,0 +1,148 @@ +package filer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "sync" +) + +type ChunkGroup struct { + lookupFn wdclient.LookupFileIdFunctionType + chunkCache chunk_cache.ChunkCache + manifestChunks []*filer_pb.FileChunk + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex +} + +func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { + group := &ChunkGroup{ + lookupFn: lookupFn, + chunkCache: chunkCache, + sections: make(map[SectionIndex]*FileChunkSection), + } + + err := group.SetChunks(chunks) + return group, err +} + +func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { + + group.sectionsLock.Lock() + defer group.sectionsLock.Unlock() + + sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize) + for si := sectionIndexStart; si < sectionIndexStop+1; si++ { + section, found := group.sections[si] + if !found { + section = NewFileChunkSection(si) + group.sections[si] = section + } + section.addChunk(chunk) + } + return nil +} + +func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) { + + group.sectionsLock.RLock() + defer group.sectionsLock.RUnlock() + + sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + for si := sectionIndexStart; si < sectionIndexStop+1; si++ { + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if !found { + for i := rangeStart; i < rangeStop; i++ { + buff[i-offset] = 0 + } + continue + } + xn, xTsNs, xErr := section.readDataAt(group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart) + if xErr != nil { + err = xErr + } + n += xn + tsNs = max(tsNs, xTsNs) + } + return +} + +func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { + var dataChunks []*filer_pb.FileChunk + for _, chunk := range chunks { + + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + continue + } + + resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk) + if err != nil { + return err + } + + group.manifestChunks = append(group.manifestChunks, chunk) + dataChunks = append(dataChunks, resolvedChunks...) + } + + for _, chunk := range dataChunks { + sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize) + for si := sectionIndexStart; si < sectionIndexStop+1; si++ { + section, found := group.sections[si] + if !found { + section = NewFileChunkSection(si) + group.sections[si] = section + } + section.chunks = append(section.chunks, chunk) + } + } + return nil +} + +const ( + // see weedfs_file_lseek.go + SEEK_DATA uint32 = 3 // seek to next data after the offset + // SEEK_HOLE uint32 = 4 // seek to next hole after the offset +) + +// FIXME: needa tests +func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) { + group.sectionsLock.RLock() + defer group.sectionsLock.RUnlock() + + return group.doSearchChunks(offset, fileSize, whence) +} + +func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) { + + sectionIndex, maxSectionIndex := SectionIndex(offset/SectionSize), SectionIndex(fileSize/SectionSize) + if whence == SEEK_DATA { + for si := sectionIndex; si < maxSectionIndex+1; si++ { + section, foundSection := group.sections[si] + if !foundSection { + continue + } + sectionStart := section.DataStartOffset(group, offset, fileSize) + if sectionStart == -1 { + continue + } + return true, sectionStart + } + return false, 0 + } else { + // whence == SEEK_HOLE + for si := sectionIndex; si < maxSectionIndex; si++ { + section, foundSection := group.sections[si] + if !foundSection { + return true, offset + } + holeStart := section.NextStopOffset(group, offset, fileSize) + if holeStart%SectionSize == 0 { + continue + } + return true, holeStart + } + return true, fileSize + } +} diff --git a/weed/filer/filechunk_group_test.go b/weed/filer/filechunk_group_test.go new file mode 100644 index 000000000..d24d66a49 --- /dev/null +++ b/weed/filer/filechunk_group_test.go @@ -0,0 +1,36 @@ +package filer + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestChunkGroup_doSearchChunks(t *testing.T) { + type fields struct { + sections map[SectionIndex]*FileChunkSection + } + type args struct { + offset int64 + fileSize int64 + whence uint32 + } + tests := []struct { + name string + fields fields + args args + wantFound bool + wantOut int64 + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + group := &ChunkGroup{ + sections: tt.fields.sections, + } + gotFound, gotOut := group.doSearchChunks(tt.args.offset, tt.args.fileSize, tt.args.whence) + assert.Equalf(t, tt.wantFound, gotFound, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence) + assert.Equalf(t, tt.wantOut, gotOut, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence) + }) + } +} diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 221a11ffe..d9d0331be 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -264,7 +264,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer } } - manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0) + manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0) if err != nil { return nil, err } @@ -275,4 +275,4 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer return } -type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) +type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go new file mode 100644 index 000000000..60c919569 --- /dev/null +++ b/weed/filer/filechunk_section.go @@ -0,0 +1,119 @@ +package filer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "sync" +) + +const SectionSize = 2 * 1024 * 1024 * 128 // 256MiB +type SectionIndex int64 +type FileChunkSection struct { + sectionIndex SectionIndex + chunks []*filer_pb.FileChunk + visibleIntervals *IntervalList[*VisibleInterval] + chunkViews *IntervalList[*ChunkView] + reader *ChunkReadAt + lock sync.Mutex +} + +func NewFileChunkSection(si SectionIndex) *FileChunkSection { + return &FileChunkSection{ + sectionIndex: si, + } +} + +func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error { + section.lock.Lock() + defer section.lock.Unlock() + + start, stop := max(int64(section.sectionIndex)*SectionSize, chunk.Offset), min(((int64(section.sectionIndex)+1)*SectionSize), chunk.Offset+int64(chunk.Size)) + + section.chunks = append(section.chunks, chunk) + + if section.visibleIntervals != nil { + MergeIntoVisibles(section.visibleIntervals, start, stop, chunk) + } + + if section.visibleIntervals != nil { + section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks) + } + + if section.chunkViews != nil { + MergeIntoChunkViews(section.chunkViews, start, stop, chunk) + } + + return nil +} + +func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) { + if section.visibleIntervals == nil { + section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize) + section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks) + if section.reader != nil { + _ = section.reader.Close() + section.reader = nil + } + } + if section.chunkViews == nil { + section.chunkViews = ViewFromVisibleIntervals(section.visibleIntervals, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize) + } + + if section.reader == nil { + section.reader = NewChunkReaderAtFromClient(group.lookupFn, section.chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + } + section.reader.fileSize = fileSize +} + +func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) { + section.lock.Lock() + defer section.lock.Unlock() + + section.setupForRead(group, fileSize) + + return section.reader.ReadAtWithTime(buff, offset) +} + +func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 { + section.lock.Lock() + defer section.lock.Unlock() + + section.setupForRead(group, fileSize) + + for x := section.visibleIntervals.Front(); x != nil; x = x.Next { + visible := x.Value + if visible.stop <= offset { + continue + } + if offset < visible.start { + return offset + } + return offset + } + return -1 +} + +func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 { + section.lock.Lock() + defer section.lock.Unlock() + + section.setupForRead(group, fileSize) + + isAfterOffset := false + for x := section.visibleIntervals.Front(); x != nil; x = x.Next { + visible := x.Value + if !isAfterOffset { + if visible.stop <= offset { + continue + } + isAfterOffset = true + } + if offset < visible.start { + return offset + } + // now visible.start <= offset + if offset < visible.stop { + offset = visible.stop + } + } + return offset +} diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 061e0757a..d872bd22d 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "github.com/seaweedfs/seaweedfs/weed/wdclient" - "golang.org/x/exp/slices" "math" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -42,7 +41,7 @@ func ETag(entry *filer_pb.Entry) (etag string) { } func ETagEntry(entry *Entry) (etag string) { - if entry.IsInRemoteOnly() { + if entry.IsInRemoteOnly() { return entry.Remote.RemoteETag } if entry.Attr.Md5 == nil { @@ -66,8 +65,15 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64) + compacted, garbage = SeparateGarbageChunks(visibles, chunks) + + return +} + +func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk) { fileIds := make(map[string]bool) - for _, interval := range visibles { + for x := visibles.Front(); x != nil; x = x.Next { + interval := x.Value fileIds[interval.fileId] = true } for _, chunk := range chunks { @@ -77,8 +83,7 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks garbage = append(garbage, chunk) } } - - return + return compacted, garbage } func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { @@ -131,20 +136,39 @@ func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_p } type ChunkView struct { - FileId string - Offset int64 - Size uint64 - LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk - ChunkSize uint64 - CipherKey []byte - IsGzipped bool + FileId string + OffsetInChunk int64 // offset within the chunk + ViewSize uint64 + ViewOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk + ChunkSize uint64 + CipherKey []byte + IsGzipped bool + ModifiedTsNs int64 +} + +func (cv *ChunkView) SetStartStop(start, stop int64) { + cv.OffsetInChunk += start - cv.ViewOffset + cv.ViewOffset = start + cv.ViewSize = uint64(stop - start) +} +func (cv *ChunkView) Clone() IntervalValue { + return &ChunkView{ + FileId: cv.FileId, + OffsetInChunk: cv.OffsetInChunk, + ViewSize: cv.ViewSize, + ViewOffset: cv.ViewOffset, + ChunkSize: cv.ChunkSize, + CipherKey: cv.CipherKey, + IsGzipped: cv.IsGzipped, + ModifiedTsNs: cv.ModifiedTsNs, + } } func (cv *ChunkView) IsFullChunk() bool { - return cv.Size == cv.ChunkSize + return cv.ViewSize == cv.ChunkSize } -func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { +func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) { visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size) @@ -152,7 +176,7 @@ func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []* } -func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) { +func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) { stop := offset + size if size == math.MaxInt64 { @@ -162,164 +186,112 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int stop = math.MaxInt64 } - for _, chunk := range visibles { + chunkViews = NewIntervalList[*ChunkView]() + for x := visibles.Front(); x != nil; x = x.Next { + chunk := x.Value chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop) if chunkStart < chunkStop { - views = append(views, &ChunkView{ - FileId: chunk.fileId, - Offset: chunkStart - chunk.start + chunk.chunkOffset, - Size: uint64(chunkStop - chunkStart), - LogicOffset: chunkStart, - ChunkSize: chunk.chunkSize, - CipherKey: chunk.cipherKey, - IsGzipped: chunk.isGzipped, + chunkView := &ChunkView{ + FileId: chunk.fileId, + OffsetInChunk: chunkStart - chunk.start + chunk.offsetInChunk, + ViewSize: uint64(chunkStop - chunkStart), + ViewOffset: chunkStart, + ChunkSize: chunk.chunkSize, + CipherKey: chunk.cipherKey, + IsGzipped: chunk.isGzipped, + ModifiedTsNs: chunk.modifiedTsNs, + } + chunkViews.AppendInterval(&Interval[*ChunkView]{ + StartOffset: chunkStart, + StopOffset: chunkStop, + TsNs: chunk.modifiedTsNs, + Value: chunkView, + Prev: nil, + Next: nil, }) } } - return views + return chunkViews } -func logPrintf(name string, visibles []VisibleInterval) { - - /* - glog.V(0).Infof("%s len %d", name, len(visibles)) - for _, v := range visibles { - glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset) - } - */ -} - -func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) { - - newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.ModifiedTsNs, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed) - - length := len(visibles) - if length == 0 { - return append(visibles, newV) - } - last := visibles[length-1] - if last.stop <= chunk.Offset { - return append(visibles, newV) +func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, chunk *filer_pb.FileChunk) { + + newV := &VisibleInterval{ + start: start, + stop: stop, + fileId: chunk.GetFileIdString(), + modifiedTsNs: chunk.ModifiedTsNs, + offsetInChunk: start - chunk.Offset, // the starting position in the chunk + chunkSize: chunk.Size, // size of the chunk + cipherKey: chunk.CipherKey, + isGzipped: chunk.IsCompressed, } - logPrintf(" before", visibles) - // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size) - chunkStop := chunk.Offset + int64(chunk.Size) - for _, v := range visibles { - if v.start < chunk.Offset && chunk.Offset < v.stop { - t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTsNs, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped) - newVisibles = append(newVisibles, t) - // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop) - } - if v.start < chunkStop && chunkStop < v.stop { - t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTsNs, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped) - newVisibles = append(newVisibles, t) - // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop) - } - if chunkStop <= v.start || v.stop <= chunk.Offset { - newVisibles = append(newVisibles, v) - // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop) - } - } - newVisibles = append(newVisibles, newV) - - logPrintf(" append", newVisibles) + visibles.InsertInterval(start, stop, chunk.ModifiedTsNs, newV) +} - for i := len(newVisibles) - 1; i >= 0; i-- { - if i > 0 && newV.start < newVisibles[i-1].start { - newVisibles[i] = newVisibles[i-1] - } else { - newVisibles[i] = newV - break - } +func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop int64, chunk *filer_pb.FileChunk) { + + chunkView := &ChunkView{ + FileId: chunk.GetFileIdString(), + OffsetInChunk: start - chunk.Offset, + ViewSize: uint64(stop - start), + ViewOffset: start, + ChunkSize: chunk.Size, + CipherKey: chunk.CipherKey, + IsGzipped: chunk.IsCompressed, + ModifiedTsNs: chunk.ModifiedTsNs, } - logPrintf(" sorted", newVisibles) - return newVisibles + chunkViews.InsertInterval(start, stop, chunk.ModifiedTsNs, chunkView) } // NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory // If the file chunk content is a chunk manifest -func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) { +func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) { chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset) if err != nil { return } - visibles2 := readResolvedChunks(chunks) - - if true { - return visibles2, err - } - slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool { - if a.ModifiedTsNs == b.ModifiedTsNs { - filer_pb.EnsureFid(a) - filer_pb.EnsureFid(b) - if a.Fid == nil || b.Fid == nil { - return true - } - return a.Fid.FileKey < b.Fid.FileKey - } - return a.ModifiedTsNs < b.ModifiedTsNs - }) - for _, chunk := range chunks { - - // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) - visibles = MergeIntoVisibles(visibles, chunk) - - logPrintf("add", visibles) + visibles2 := readResolvedChunks(chunks, 0, math.MaxInt64) - } - - if len(visibles) != len(visibles2) { - fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2)) - } else { - for i := 0; i < len(visibles); i++ { - checkDifference(visibles[i], visibles2[i]) - } - } - - return -} - -func checkDifference(x, y VisibleInterval) { - if x.start != y.start || - x.stop != y.stop || - x.fileId != y.fileId || - x.modifiedTsNs != y.modifiedTsNs { - fmt.Printf("different visible %+v : %+v\n", x, y) - } + return visibles2, err } // find non-overlapping visible intervals // visible interval map to one file chunk type VisibleInterval struct { - start int64 - stop int64 - modifiedTsNs int64 - fileId string - chunkOffset int64 - chunkSize uint64 - cipherKey []byte - isGzipped bool + start int64 + stop int64 + modifiedTsNs int64 + fileId string + offsetInChunk int64 + chunkSize uint64 + cipherKey []byte + isGzipped bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { - return VisibleInterval{ - start: start, - stop: stop, - fileId: fileId, - modifiedTsNs: modifiedTime, - chunkOffset: chunkOffset, // the starting position in the chunk - chunkSize: chunkSize, - cipherKey: cipherKey, - isGzipped: isGzipped, +func (v *VisibleInterval) SetStartStop(start, stop int64) { + v.offsetInChunk += start - v.start + v.start, v.stop = start, stop +} +func (v *VisibleInterval) Clone() IntervalValue { + return &VisibleInterval{ + start: v.start, + stop: v.stop, + modifiedTsNs: v.modifiedTsNs, + fileId: v.fileId, + offsetInChunk: v.offsetInChunk, + chunkSize: v.chunkSize, + cipherKey: v.cipherKey, + isGzipped: v.isGzipped, } } diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go index 8a15f6e7a..8b2d36e12 100644 --- a/weed/filer/filechunks_read.go +++ b/weed/filer/filechunks_read.go @@ -1,14 +1,22 @@ package filer import ( + "container/list" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "golang.org/x/exp/slices" ) -func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { +func readResolvedChunks(chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval]) { var points []*Point for _, chunk := range chunks { + if chunk.IsChunkManifest { + println("This should not happen! A manifest chunk found:", chunk.GetFileIdString()) + } + start, stop := max(chunk.Offset, startOffset), min(chunk.Offset+int64(chunk.Size), stopOffset) + if start >= stop { + continue + } points = append(points, &Point{ x: chunk.Offset, ts: chunk.ModifiedTsNs, @@ -33,40 +41,45 @@ func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterva }) var prevX int64 - var queue []*Point + queue := list.New() // points with higher ts are at the tail + visibles = NewIntervalList[*VisibleInterval]() + var prevPoint *Point for _, point := range points { + if queue.Len() > 0 { + prevPoint = queue.Back().Value.(*Point) + } else { + prevPoint = nil + } if point.isStart { - if len(queue) > 0 { - lastIndex := len(queue) - 1 - lastPoint := queue[lastIndex] - if point.x != prevX && lastPoint.ts < point.ts { - visibles = addToVisibles(visibles, prevX, lastPoint, point) + if prevPoint != nil { + if point.x != prevX && prevPoint.ts < point.ts { + addToVisibles(visibles, prevX, prevPoint, point) prevX = point.x } } // insert into queue - for i := len(queue); i >= 0; i-- { - if i == 0 || queue[i-1].ts <= point.ts { - if i == len(queue) { - prevX = point.x + if prevPoint == nil || prevPoint.ts < point.ts { + queue.PushBack(point) + prevX = point.x + } else { + for e := queue.Front(); e != nil; e = e.Next() { + if e.Value.(*Point).ts > point.ts { + queue.InsertBefore(point, e) + break } - queue = addToQueue(queue, i, point) - break } } } else { - lastIndex := len(queue) - 1 - index := lastIndex - var startPoint *Point - for ; index >= 0; index-- { - startPoint = queue[index] - if startPoint.ts == point.ts { - queue = removeFromQueue(queue, index) + isLast := true + for e := queue.Back(); e != nil; e = e.Prev() { + if e.Value.(*Point).ts == point.ts { + queue.Remove(e) break } + isLast = false } - if index == lastIndex && startPoint != nil { - visibles = addToVisibles(visibles, prevX, startPoint, point) + if isLast && prevPoint != nil { + addToVisibles(visibles, prevX, prevPoint, point) prevX = point.x } } @@ -75,37 +88,30 @@ func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterva return } -func removeFromQueue(queue []*Point, index int) []*Point { - for i := index; i < len(queue)-1; i++ { - queue[i] = queue[i+1] - } - queue = queue[:len(queue)-1] - return queue -} - -func addToQueue(queue []*Point, index int, point *Point) []*Point { - queue = append(queue, point) - for i := len(queue) - 1; i > index; i-- { - queue[i], queue[i-1] = queue[i-1], queue[i] - } - return queue -} - -func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval { +func addToVisibles(visibles *IntervalList[*VisibleInterval], prevX int64, startPoint *Point, point *Point) { if prevX < point.x { chunk := startPoint.chunk - visibles = append(visibles, VisibleInterval{ - start: prevX, - stop: point.x, - fileId: chunk.GetFileIdString(), - modifiedTsNs: chunk.ModifiedTsNs, - chunkOffset: prevX - chunk.Offset, - chunkSize: chunk.Size, - cipherKey: chunk.CipherKey, - isGzipped: chunk.IsCompressed, - }) + visible := &VisibleInterval{ + start: prevX, + stop: point.x, + fileId: chunk.GetFileIdString(), + modifiedTsNs: chunk.ModifiedTsNs, + offsetInChunk: prevX - chunk.Offset, + chunkSize: chunk.Size, + cipherKey: chunk.CipherKey, + isGzipped: chunk.IsCompressed, + } + appendVisibleInterfal(visibles, visible) } - return visibles +} + +func appendVisibleInterfal(visibles *IntervalList[*VisibleInterval], visible *VisibleInterval) { + visibles.AppendInterval(&Interval[*VisibleInterval]{ + StartOffset: visible.start, + StopOffset: visible.stop, + TsNs: visible.modifiedTsNs, + Value: visible, + }) } type Point struct { diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go index d4bfca72e..c66a874bc 100644 --- a/weed/filer/filechunks_read_test.go +++ b/weed/filer/filechunks_read_test.go @@ -3,6 +3,7 @@ package filer import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "math" "math/rand" "testing" ) @@ -42,9 +43,38 @@ func TestReadResolvedChunks(t *testing.T) { }, } - visibles := readResolvedChunks(chunks) + visibles := readResolvedChunks(chunks, 0, math.MaxInt64) - for _, visible := range visibles { + fmt.Printf("resolved to %d visible intervales\n", visibles.Len()) + for x := visibles.Front(); x != nil; x = x.Next { + visible := x.Value + fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs) + } + +} + +func TestReadResolvedChunks2(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + { + FileId: "c", + Offset: 200, + Size: 50, + ModifiedTsNs: 3, + }, + { + FileId: "e", + Offset: 200, + Size: 25, + ModifiedTsNs: 5, + }, + } + + visibles := readResolvedChunks(chunks, 0, math.MaxInt64) + + fmt.Printf("resolved to %d visible intervales\n", visibles.Len()) + for x := visibles.Front(); x != nil; x = x.Next { + visible := x.Value fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs) } @@ -72,9 +102,10 @@ func TestRandomizedReadResolvedChunks(t *testing.T) { chunks = append(chunks, randomWrite(array, start, size, ts)) } - visibles := readResolvedChunks(chunks) + visibles := readResolvedChunks(chunks, 0, math.MaxInt64) - for _, visible := range visibles { + for x := visibles.Front(); x != nil; x = x.Next { + visible := x.Value for i := visible.start; i < visible.stop; i++ { if array[i] != visible.modifiedTsNs { t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTsNs) @@ -112,9 +143,9 @@ func TestSequentialReadResolvedChunks(t *testing.T) { }) } - visibles := readResolvedChunks(chunks) + visibles := readResolvedChunks(chunks, 0, math.MaxInt64) - fmt.Printf("visibles %d", len(visibles)) + fmt.Printf("visibles %d", visibles.Len()) } @@ -201,9 +232,48 @@ func TestActualReadResolvedChunks(t *testing.T) { }, } - visibles := readResolvedChunks(chunks) + visibles := readResolvedChunks(chunks, 0, math.MaxInt64) + + for x := visibles.Front(); x != nil; x = x.Next { + visible := x.Value + fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs) + } + +} + +func TestActualReadResolvedChunks2(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + { + FileId: "1,e7b96fef48", + Offset: 0, + Size: 184320, + ModifiedTsNs: 1, + }, + { + FileId: "2,22562640b9", + Offset: 184320, + Size: 4096, + ModifiedTsNs: 2, + }, + { + FileId: "2,33562640b9", + Offset: 184320, + Size: 4096, + ModifiedTsNs: 4, + }, + { + FileId: "4,df033e0fe4", + Offset: 188416, + Size: 2097152, + ModifiedTsNs: 3, + }, + } + + visibles := readResolvedChunks(chunks, 0, math.MaxInt64) - for _, visible := range visibles { + for x := visibles.Front(); x != nil; x = x.Next { + visible := x.Value fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTsNs) } diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go index d29e0a600..b448950a9 100644 --- a/weed/filer/filechunks_test.go +++ b/weed/filer/filechunks_test.go @@ -92,7 +92,8 @@ func TestRandomFileChunksCompact(t *testing.T) { visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64) - for _, v := range visibles { + for visible := visibles.Front(); visible != nil; visible = visible.Next { + v := visible.Value for x := v.start; x < v.stop; x++ { assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId) } @@ -137,7 +138,7 @@ func TestIntervalMerging(t *testing.T) { }, Expected: []*VisibleInterval{ {start: 0, stop: 70, fileId: "b"}, - {start: 70, stop: 100, fileId: "a", chunkOffset: 70}, + {start: 70, stop: 100, fileId: "a", offsetInChunk: 70}, }, }, // case 3: updates overwrite full chunks @@ -174,15 +175,15 @@ func TestIntervalMerging(t *testing.T) { }, Expected: []*VisibleInterval{ {start: 0, stop: 200, fileId: "d"}, - {start: 200, stop: 220, fileId: "c", chunkOffset: 130}, + {start: 200, stop: 220, fileId: "c", offsetInChunk: 130}, }, }, // case 6: same updates { Chunks: []*filer_pb.FileChunk{ {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, ModifiedTsNs: 123}, - {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 123}, - {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 123}, + {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 124}, + {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 125}, }, Expected: []*VisibleInterval{ {start: 0, stop: 100, fileId: "xyz"}, @@ -228,11 +229,17 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64) - for x, interval := range intervals { - log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", - i, x, interval.start, interval.stop, interval.fileId) + x := -1 + for visible := intervals.Front(); visible != nil; visible = visible.Next { + x++ + interval := visible.Value + log.Printf("test case %d, interval start=%d, stop=%d, fileId=%s", + i, interval.start, interval.stop, interval.fileId) } - for x, interval := range intervals { + x = -1 + for visible := intervals.Front(); visible != nil; visible = visible.Next { + x++ + interval := visible.Value if interval.start != testcase.Expected[x].start { t.Fatalf("failed on test case %d, interval %d, start %d, expect %d", i, x, interval.start, testcase.Expected[x].start) @@ -245,13 +252,13 @@ func TestIntervalMerging(t *testing.T) { t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s", i, x, interval.fileId, testcase.Expected[x].fileId) } - if interval.chunkOffset != testcase.Expected[x].chunkOffset { - t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d", - i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset) + if interval.offsetInChunk != testcase.Expected[x].offsetInChunk { + t.Fatalf("failed on test case %d, interval %d, offsetInChunk %d, expect %d", + i, x, interval.offsetInChunk, testcase.Expected[x].offsetInChunk) } } - if len(intervals) != len(testcase.Expected) { - t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected)) + if intervals.Len() != len(testcase.Expected) { + t.Fatalf("failed to compact test case %d, len %d expected %d", i, intervals.Len(), len(testcase.Expected)) } } @@ -276,9 +283,9 @@ func TestChunksReading(t *testing.T) { Offset: 0, Size: 250, Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, - {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200}, + {OffsetInChunk: 0, ViewSize: 100, FileId: "abc", ViewOffset: 0}, + {OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 100}, + {OffsetInChunk: 0, ViewSize: 50, FileId: "fsad", ViewOffset: 200}, }, }, // case 1: updates overwrite full chunks @@ -290,7 +297,7 @@ func TestChunksReading(t *testing.T) { Offset: 50, Size: 100, Expected: []*ChunkView{ - {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50}, + {OffsetInChunk: 50, ViewSize: 100, FileId: "asdf", ViewOffset: 50}, }, }, // case 2: updates overwrite part of previous chunks @@ -302,8 +309,8 @@ func TestChunksReading(t *testing.T) { Offset: 30, Size: 40, Expected: []*ChunkView{ - {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30}, - {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60}, + {OffsetInChunk: 20, ViewSize: 30, FileId: "b", ViewOffset: 30}, + {OffsetInChunk: 57, ViewSize: 10, FileId: "a", ViewOffset: 60}, }, }, // case 3: updates overwrite full chunks @@ -316,8 +323,8 @@ func TestChunksReading(t *testing.T) { Offset: 0, Size: 200, Expected: []*ChunkView{ - {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50}, + {OffsetInChunk: 0, ViewSize: 50, FileId: "asdf", ViewOffset: 0}, + {OffsetInChunk: 0, ViewSize: 150, FileId: "xxxx", ViewOffset: 50}, }, }, // case 4: updates far away from prev chunks @@ -330,8 +337,8 @@ func TestChunksReading(t *testing.T) { Offset: 0, Size: 400, Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250}, + {OffsetInChunk: 0, ViewSize: 200, FileId: "asdf", ViewOffset: 0}, + {OffsetInChunk: 0, ViewSize: 150, FileId: "xxxx", ViewOffset: 250}, }, }, // case 5: updates overwrite full chunks @@ -345,21 +352,21 @@ func TestChunksReading(t *testing.T) { Offset: 0, Size: 220, Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0}, - {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200}, + {OffsetInChunk: 0, ViewSize: 200, FileId: "c", ViewOffset: 0}, + {OffsetInChunk: 130, ViewSize: 20, FileId: "b", ViewOffset: 200}, }, }, // case 6: same updates { Chunks: []*filer_pb.FileChunk{ {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, ModifiedTsNs: 123}, - {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 123}, - {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 123}, + {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, ModifiedTsNs: 124}, + {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, ModifiedTsNs: 125}, }, Offset: 0, Size: 100, Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0}, + {OffsetInChunk: 0, ViewSize: 100, FileId: "xyz", ViewOffset: 0}, }, }, // case 7: edge cases @@ -372,8 +379,8 @@ func TestChunksReading(t *testing.T) { Offset: 0, Size: 200, Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, + {OffsetInChunk: 0, ViewSize: 100, FileId: "abc", ViewOffset: 0}, + {OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 100}, }, }, // case 8: edge cases @@ -386,9 +393,9 @@ func TestChunksReading(t *testing.T) { Offset: 0, Size: 300, Expected: []*ChunkView{ - {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90}, - {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190}, + {OffsetInChunk: 0, ViewSize: 90, FileId: "abc", ViewOffset: 0}, + {OffsetInChunk: 0, ViewSize: 100, FileId: "asdf", ViewOffset: 90}, + {OffsetInChunk: 0, ViewSize: 110, FileId: "fsad", ViewOffset: 190}, }, }, // case 9: edge cases @@ -404,12 +411,12 @@ func TestChunksReading(t *testing.T) { Offset: 0, Size: 153578836, Expected: []*ChunkView{ - {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0}, - {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936}, - {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760}, - {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736}, - {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168}, - {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248}, + {OffsetInChunk: 0, ViewSize: 43175936, FileId: "2,111fc2cbfac1", ViewOffset: 0}, + {OffsetInChunk: 0, ViewSize: 52981760 - 43175936, FileId: "2,112a36ea7f85", ViewOffset: 43175936}, + {OffsetInChunk: 0, ViewSize: 72564736 - 52981760, FileId: "4,112d5f31c5e7", ViewOffset: 52981760}, + {OffsetInChunk: 0, ViewSize: 133255168 - 72564736, FileId: "1,113245f0cdb6", ViewOffset: 72564736}, + {OffsetInChunk: 0, ViewSize: 137269248 - 133255168, FileId: "3,1141a70733b5", ViewOffset: 133255168}, + {OffsetInChunk: 0, ViewSize: 153578836 - 137269248, FileId: "1,114201d5bbdb", ViewOffset: 137269248}, }, }, } @@ -420,28 +427,31 @@ func TestChunksReading(t *testing.T) { } log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) - for x, chunk := range chunks { + x := -1 + for c := chunks.Front(); c != nil; c = c.Next { + x++ + chunk := c.Value log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", - i, x, chunk.Offset, chunk.Size, chunk.FileId) - if chunk.Offset != testcase.Expected[x].Offset { + i, x, chunk.OffsetInChunk, chunk.ViewSize, chunk.FileId) + if chunk.OffsetInChunk != testcase.Expected[x].OffsetInChunk { t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d", - i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset) + i, chunk.FileId, chunk.OffsetInChunk, testcase.Expected[x].OffsetInChunk) } - if chunk.Size != testcase.Expected[x].Size { - t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d", - i, chunk.FileId, chunk.Size, testcase.Expected[x].Size) + if chunk.ViewSize != testcase.Expected[x].ViewSize { + t.Fatalf("failed on read case %d, chunk %s, ViewSize %d, expect %d", + i, chunk.FileId, chunk.ViewSize, testcase.Expected[x].ViewSize) } if chunk.FileId != testcase.Expected[x].FileId { t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s", i, x, chunk.FileId, testcase.Expected[x].FileId) } - if chunk.LogicOffset != testcase.Expected[x].LogicOffset { - t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d", - i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset) + if chunk.ViewOffset != testcase.Expected[x].ViewOffset { + t.Fatalf("failed on read case %d, chunk %d, ViewOffset %d, expect %d", + i, x, chunk.ViewOffset, testcase.Expected[x].ViewOffset) } } - if len(chunks) != len(testcase.Expected) { - t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected)) + if chunks.Len() != len(testcase.Expected) { + t.Fatalf("failed to read test case %d, len %d expected %d", i, chunks.Len(), len(testcase.Expected)) } } @@ -467,73 +477,79 @@ func BenchmarkCompactFileChunks(b *testing.B) { } } +func addVisibleInterval(visibles *IntervalList[*VisibleInterval], x *VisibleInterval) { + visibles.AppendInterval(&Interval[*VisibleInterval]{ + StartOffset: x.start, + StopOffset: x.stop, + TsNs: x.modifiedTsNs, + Value: x, + }) +} + func TestViewFromVisibleIntervals(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 0, - stop: 25, - fileId: "fid1", - }, - { - start: 4096, - stop: 8192, - fileId: "fid2", - }, - { - start: 16384, - stop: 18551, - fileId: "fid3", - }, - } + visibles := NewIntervalList[*VisibleInterval]() + addVisibleInterval(visibles, &VisibleInterval{ + start: 0, + stop: 25, + fileId: "fid1", + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 4096, + stop: 8192, + fileId: "fid2", + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 16384, + stop: 18551, + fileId: "fid3", + }) views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) - if len(views) != len(visibles) { - assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + if views.Len() != visibles.Len() { + assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error") } } func TestViewFromVisibleIntervals2(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 344064, - stop: 348160, - fileId: "fid1", - }, - { - start: 348160, - stop: 356352, - fileId: "fid2", - }, - } + visibles := NewIntervalList[*VisibleInterval]() + addVisibleInterval(visibles, &VisibleInterval{ + start: 344064, + stop: 348160, + fileId: "fid1", + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 348160, + stop: 356352, + fileId: "fid2", + }) views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) - if len(views) != len(visibles) { - assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + if views.Len() != visibles.Len() { + assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error") } } func TestViewFromVisibleIntervals3(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 1000, - stop: 2000, - fileId: "fid1", - }, - { - start: 3000, - stop: 4000, - fileId: "fid2", - }, - } + visibles := NewIntervalList[*VisibleInterval]() + addVisibleInterval(visibles, &VisibleInterval{ + start: 1000, + stop: 2000, + fileId: "fid1", + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 3000, + stop: 4000, + fileId: "fid2", + }) views := ViewFromVisibleIntervals(visibles, 1700, 1500) - if len(views) != len(visibles) { - assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + if views.Len() != visibles.Len() { + assert.Equal(t, visibles.Len(), views.Len(), "ViewFromVisibleIntervals error") } } diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 5c03d4f16..55278c492 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -40,7 +40,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { } // append to existing chunks - entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset)) + entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano())) // update the entry err = f.CreateEntry(context.Background(), entry, false, false, nil, false) diff --git a/weed/filer/interval_list.go b/weed/filer/interval_list.go new file mode 100644 index 000000000..b3d2a76b9 --- /dev/null +++ b/weed/filer/interval_list.go @@ -0,0 +1,259 @@ +package filer + +import ( + "math" + "sync" +) + +type IntervalValue interface { + SetStartStop(start, stop int64) + Clone() IntervalValue +} + +type Interval[T IntervalValue] struct { + StartOffset int64 + StopOffset int64 + TsNs int64 + Value T + Prev *Interval[T] + Next *Interval[T] +} + +func (interval *Interval[T]) Size() int64 { + return interval.StopOffset - interval.StartOffset +} + +// IntervalList mark written intervals within one page chunk +type IntervalList[T IntervalValue] struct { + head *Interval[T] + tail *Interval[T] + Lock sync.Mutex +} + +func NewIntervalList[T IntervalValue]() *IntervalList[T] { + list := &IntervalList[T]{ + head: &Interval[T]{ + StartOffset: -1, + StopOffset: -1, + }, + tail: &Interval[T]{ + StartOffset: math.MaxInt64, + StopOffset: math.MaxInt64, + }, + } + return list +} + +func (list *IntervalList[T]) Front() (interval *Interval[T]) { + return list.head.Next +} + +func (list *IntervalList[T]) AppendInterval(interval *Interval[T]) { + list.Lock.Lock() + defer list.Lock.Unlock() + + if list.head.Next == nil { + list.head.Next = interval + } + interval.Prev = list.tail.Prev + if list.tail.Prev != nil { + list.tail.Prev.Next = interval + } + list.tail.Prev = interval +} + +func (list *IntervalList[T]) Overlay(startOffset, stopOffset, tsNs int64, value T) { + if startOffset >= stopOffset { + return + } + interval := &Interval[T]{ + StartOffset: startOffset, + StopOffset: stopOffset, + TsNs: tsNs, + Value: value, + } + + list.Lock.Lock() + defer list.Lock.Unlock() + + list.overlayInterval(interval) +} + +func (list *IntervalList[T]) InsertInterval(startOffset, stopOffset, tsNs int64, value T) { + interval := &Interval[T]{ + StartOffset: startOffset, + StopOffset: stopOffset, + TsNs: tsNs, + Value: value, + } + + list.Lock.Lock() + defer list.Lock.Unlock() + + value.SetStartStop(startOffset, stopOffset) + list.insertInterval(interval) +} + +func (list *IntervalList[T]) insertInterval(interval *Interval[T]) { + prev := list.head + next := prev.Next + + for interval.StartOffset < interval.StopOffset { + if next == nil { + // add to the end + list.insertBetween(prev, interval, list.tail) + break + } + + // interval is ahead of the next + if interval.StopOffset <= next.StartOffset { + list.insertBetween(prev, interval, next) + break + } + + // interval is after the next + if next.StopOffset <= interval.StartOffset { + prev = next + next = next.Next + continue + } + + // intersecting next and interval + if interval.TsNs >= next.TsNs { + // interval is newer + if next.StartOffset < interval.StartOffset { + // left side of next is ahead of interval + t := &Interval[T]{ + StartOffset: next.StartOffset, + StopOffset: interval.StartOffset, + TsNs: next.TsNs, + Value: next.Value.Clone().(T), + } + t.Value.SetStartStop(t.StartOffset, t.StopOffset) + list.insertBetween(prev, t, interval) + next.StartOffset = interval.StartOffset + next.Value.SetStartStop(next.StartOffset, next.StopOffset) + prev = t + } + if interval.StopOffset < next.StopOffset { + // right side of next is after interval + next.StartOffset = interval.StopOffset + next.Value.SetStartStop(next.StartOffset, next.StopOffset) + list.insertBetween(prev, interval, next) + break + } else { + // next is covered + prev.Next = interval + next = next.Next + } + } else { + // next is newer + if interval.StartOffset < next.StartOffset { + // left side of interval is ahead of next + t := &Interval[T]{ + StartOffset: interval.StartOffset, + StopOffset: next.StartOffset, + TsNs: interval.TsNs, + Value: interval.Value.Clone().(T), + } + t.Value.SetStartStop(t.StartOffset, t.StopOffset) + list.insertBetween(prev, t, next) + interval.StartOffset = next.StartOffset + interval.Value.SetStartStop(interval.StartOffset, interval.StopOffset) + } + if next.StopOffset < interval.StopOffset { + // right side of interval is after next + interval.StartOffset = next.StopOffset + interval.Value.SetStartStop(interval.StartOffset, interval.StopOffset) + } else { + // interval is covered + break + } + } + + } +} + +func (list *IntervalList[T]) insertBetween(a, interval, b *Interval[T]) { + a.Next = interval + b.Prev = interval + if a != list.head { + interval.Prev = a + } + if b != list.tail { + interval.Next = b + } +} + +func (list *IntervalList[T]) overlayInterval(interval *Interval[T]) { + + //t := list.head + //for ; t.Next != nil; t = t.Next { + // if t.TsNs > interval.TsNs { + // println("writes is out of order", t.TsNs-interval.TsNs, "ns") + // } + //} + + p := list.head + for ; p.Next != nil && p.Next.StopOffset <= interval.StartOffset; p = p.Next { + } + q := list.tail + for ; q.Prev != nil && q.Prev.StartOffset >= interval.StopOffset; q = q.Prev { + } + + // left side + // interval after p.Next start + if p.Next != nil && p.Next.StartOffset < interval.StartOffset { + t := &Interval[T]{ + StartOffset: p.Next.StartOffset, + StopOffset: interval.StartOffset, + TsNs: p.Next.TsNs, + Value: p.Next.Value, + } + p.Next = t + if p != list.head { + t.Prev = p + } + t.Next = interval + interval.Prev = t + } else { + p.Next = interval + if p != list.head { + interval.Prev = p + } + } + + // right side + // interval ends before p.Prev + if q.Prev != nil && interval.StopOffset < q.Prev.StopOffset { + t := &Interval[T]{ + StartOffset: interval.StopOffset, + StopOffset: q.Prev.StopOffset, + TsNs: q.Prev.TsNs, + Value: q.Prev.Value, + } + q.Prev = t + if q != list.tail { + t.Next = q + } + interval.Next = t + t.Prev = interval + } else { + q.Prev = interval + if q != list.tail { + interval.Next = q + } + } + +} + +func (list *IntervalList[T]) Len() int { + list.Lock.Lock() + defer list.Lock.Unlock() + + var count int + for t := list.head; t != nil; t = t.Next { + count++ + } + return count - 1 +} diff --git a/weed/filer/interval_list_test.go b/weed/filer/interval_list_test.go new file mode 100644 index 000000000..dea510fed --- /dev/null +++ b/weed/filer/interval_list_test.go @@ -0,0 +1,327 @@ +package filer + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "testing" +) + +type IntervalInt int + +func (i IntervalInt) SetStartStop(start, stop int64) { +} +func (i IntervalInt) Clone() IntervalValue { + return i +} + +func TestIntervalList_Overlay(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(0, 100, 1, 1) + list.Overlay(50, 150, 2, 2) + list.Overlay(200, 250, 3, 3) + list.Overlay(225, 250, 4, 4) + list.Overlay(175, 210, 5, 5) + list.Overlay(0, 25, 6, 6) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 6, list.Len()) + println() + list.Overlay(50, 150, 7, 7) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 6, list.Len()) +} + +func TestIntervalList_Overlay2(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(0, 50, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } +} + +func TestIntervalList_Overlay3(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + assert.Equal(t, 1, list.Len()) + + list.Overlay(0, 60, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_Overlay4(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(0, 100, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 1, list.Len()) +} + +func TestIntervalList_Overlay5(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(0, 110, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 1, list.Len()) +} + +func TestIntervalList_Overlay6(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(50, 110, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 1, list.Len()) +} + +func TestIntervalList_Overlay7(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(50, 90, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_Overlay8(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(60, 90, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 3, list.Len()) +} + +func TestIntervalList_Overlay9(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(60, 100, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_Overlay10(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(50, 100, 1, 1) + list.Overlay(60, 110, 2, 2) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_Overlay11(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.Overlay(0, 100, 1, 1) + list.Overlay(100, 110, 2, 2) + list.Overlay(0, 90, 3, 3) + list.Overlay(0, 80, 4, 4) + list.Overlay(0, 90, 5, 5) + list.Overlay(90, 90, 6, 6) + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 3, list.Len()) +} + +func TestIntervalList_insertInterval1(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.InsertInterval(50, 150, 2, 2) + list.InsertInterval(200, 250, 3, 3) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_insertInterval2(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.InsertInterval(50, 150, 2, 2) + list.InsertInterval(0, 25, 3, 3) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_insertInterval3(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.InsertInterval(50, 150, 2, 2) + list.InsertInterval(200, 250, 4, 4) + + list.InsertInterval(0, 75, 3, 3) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 3, list.Len()) +} + +func TestIntervalList_insertInterval4(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.InsertInterval(200, 250, 4, 4) + + list.InsertInterval(0, 225, 3, 3) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_insertInterval5(t *testing.T) { + list := NewIntervalList[IntervalInt]() + list.InsertInterval(200, 250, 4, 4) + + list.InsertInterval(0, 225, 5, 5) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_insertInterval6(t *testing.T) { + list := NewIntervalList[IntervalInt]() + + list.InsertInterval(50, 150, 2, 2) + list.InsertInterval(200, 250, 4, 4) + + list.InsertInterval(0, 275, 1, 1) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 5, list.Len()) +} + +func TestIntervalList_insertInterval7(t *testing.T) { + list := NewIntervalList[IntervalInt]() + + list.InsertInterval(50, 150, 2, 2) + list.InsertInterval(200, 250, 4, 4) + + list.InsertInterval(75, 275, 1, 1) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 4, list.Len()) +} + +func TestIntervalList_insertInterval8(t *testing.T) { + list := NewIntervalList[IntervalInt]() + + list.InsertInterval(50, 150, 2, 2) + list.InsertInterval(200, 250, 4, 4) + + list.InsertInterval(75, 275, 3, 3) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 4, list.Len()) +} + +func TestIntervalList_insertInterval9(t *testing.T) { + list := NewIntervalList[IntervalInt]() + + list.InsertInterval(50, 150, 2, 2) + list.InsertInterval(200, 250, 4, 4) + + list.InsertInterval(50, 150, 3, 3) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 2, list.Len()) +} + +func TestIntervalList_insertInterval10(t *testing.T) { + list := NewIntervalList[IntervalInt]() + + list.InsertInterval(50, 100, 2, 2) + + list.InsertInterval(200, 300, 4, 4) + + list.InsertInterval(100, 200, 5, 5) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 3, list.Len()) +} + +func TestIntervalList_insertInterval11(t *testing.T) { + list := NewIntervalList[IntervalInt]() + + list.InsertInterval(0, 64, 1, 1) + + list.InsertInterval(72, 136, 3, 3) + + list.InsertInterval(64, 128, 2, 2) + + list.InsertInterval(68, 72, 4, 4) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 4, list.Len()) +} + +type IntervalStruct struct { + x int + start int64 + stop int64 +} + +func newIntervalStruct(i int) IntervalStruct { + return IntervalStruct{ + x: i, + } +} + +func (i IntervalStruct) SetStartStop(start, stop int64) { + i.start, i.stop = start, stop +} +func (i IntervalStruct) Clone() IntervalValue { + return &IntervalStruct{ + x: i.x, + start: i.start, + stop: i.stop, + } +} + +func TestIntervalList_insertIntervalStruct(t *testing.T) { + list := NewIntervalList[IntervalStruct]() + + list.InsertInterval(0, 64, 1, newIntervalStruct(1)) + + list.InsertInterval(64, 72, 2, newIntervalStruct(2)) + + list.InsertInterval(72, 136, 3, newIntervalStruct(3)) + + list.InsertInterval(64, 68, 4, newIntervalStruct(4)) + + for p := list.Front(); p != nil; p = p.Next { + fmt.Printf("[%d,%d) %d %d\n", p.StartOffset, p.StopOffset, p.TsNs, p.Value) + } + assert.Equal(t, 4, list.Len()) +} diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 9d1fab20a..27e8f79a6 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -16,8 +16,7 @@ import ( type ChunkReadAt struct { masterClient *wdclient.MasterClient - chunkViews []*ChunkView - readerLock sync.Mutex + chunkViews *IntervalList[*ChunkView] fileSize int64 readerCache *ReaderCache readerPattern *ReaderPattern @@ -89,7 +88,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews *IntervalList[*ChunkView], chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, @@ -108,44 +107,58 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerPattern.MonitorReadAt(offset, len(p)) - c.readerLock.Lock() - defer c.readerLock.Unlock() + c.chunkViews.Lock.Lock() + defer c.chunkViews.Lock.Unlock() + + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) + n, _, err = c.doReadAt(p, offset) + return +} + +func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) { + + c.readerPattern.MonitorReadAt(offset, len(p)) + + c.chunkViews.Lock.Lock() + defer c.chunkViews.Lock.Unlock() // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) return c.doReadAt(p, offset) } -func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { +func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) { startOffset, remaining := offset, int64(len(p)) - var nextChunks []*ChunkView - for i, chunk := range c.chunkViews { + var nextChunks *Interval[*ChunkView] + for x := c.chunkViews.Front(); x != nil; x = x.Next { + chunk := x.Value if remaining <= 0 { break } - if i+1 < len(c.chunkViews) { - nextChunks = c.chunkViews[i+1:] + if x.Next != nil { + nextChunks = x.Next } - if startOffset < chunk.LogicOffset { - gap := chunk.LogicOffset - startOffset - glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset) + if startOffset < chunk.ViewOffset { + gap := chunk.ViewOffset - startOffset + glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset) n += zero(p, startOffset-offset, gap) - startOffset, remaining = chunk.LogicOffset, remaining-gap + startOffset, remaining = chunk.ViewOffset, remaining-gap if remaining <= 0 { break } } - // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) - chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining) + // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize)) + chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining) if chunkStart >= chunkStop { continue } - // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset + // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize)) + bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk + ts = chunk.ModifiedTsNs copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return copied, err + return copied, ts, err } n += copied @@ -177,7 +190,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) { +func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) { if c.readerPattern.IsRandomMode() { n, err := c.readerCache.chunkCache.ReadChunkAt(buffer, chunkView.FileId, offset) @@ -187,16 +200,14 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) } - n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0) + n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.ViewOffset == 0) if c.lastChunkFid != chunkView.FileId { - if chunkView.Offset == 0 { // start of a new chunk + if chunkView.OffsetInChunk == 0 { // start of a new chunk if c.lastChunkFid != "" { c.readerCache.UnCache(c.lastChunkFid) - c.readerCache.MaybeCache(nextChunkViews) - } else { - if len(nextChunkViews) >= 1 { - c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning - } + } + if nextChunkViews != nil { + c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning } } } diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go index 29bd47ea4..f61d68a6d 100644 --- a/weed/filer/reader_at_test.go +++ b/weed/filer/reader_at_test.go @@ -5,7 +5,6 @@ import ( "io" "math" "strconv" - "sync" "testing" ) @@ -34,42 +33,40 @@ func (m *mockChunkCache) SetChunk(fileId string, data []byte) { func TestReaderAt(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 1, - stop: 2, - fileId: "1", - chunkSize: 9, - }, - { - start: 3, - stop: 4, - fileId: "3", - chunkSize: 1, - }, - { - start: 5, - stop: 6, - fileId: "5", - chunkSize: 2, - }, - { - start: 7, - stop: 9, - fileId: "7", - chunkSize: 2, - }, - { - start: 9, - stop: 10, - fileId: "9", - chunkSize: 2, - }, - } + visibles := NewIntervalList[*VisibleInterval]() + addVisibleInterval(visibles, &VisibleInterval{ + start: 1, + stop: 2, + fileId: "1", + chunkSize: 9, + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 3, + stop: 4, + fileId: "3", + chunkSize: 1, + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 5, + stop: 6, + fileId: "5", + chunkSize: 2, + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 7, + stop: 9, + fileId: "7", + chunkSize: 2, + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 9, + stop: 10, + fileId: "9", + chunkSize: 2, + }) readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - readerLock: sync.Mutex{}, fileSize: 10, readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), @@ -86,7 +83,7 @@ func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, exp if data == nil { data = make([]byte, size) } - n, err := readerAt.doReadAt(data, offset) + n, _, err := readerAt.doReadAt(data, offset) if expectedN != n { t.Errorf("unexpected read size: %d, expect: %d", n, expectedN) @@ -101,24 +98,22 @@ func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, exp func TestReaderAt0(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 2, - stop: 5, - fileId: "1", - chunkSize: 9, - }, - { - start: 7, - stop: 9, - fileId: "2", - chunkSize: 9, - }, - } + visibles := NewIntervalList[*VisibleInterval]() + addVisibleInterval(visibles, &VisibleInterval{ + start: 2, + stop: 5, + fileId: "1", + chunkSize: 9, + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 7, + stop: 9, + fileId: "2", + chunkSize: 9, + }) readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - readerLock: sync.Mutex{}, fileSize: 10, readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), @@ -135,18 +130,16 @@ func TestReaderAt0(t *testing.T) { func TestReaderAt1(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 2, - stop: 5, - fileId: "1", - chunkSize: 9, - }, - } + visibles := NewIntervalList[*VisibleInterval]() + addVisibleInterval(visibles, &VisibleInterval{ + start: 2, + stop: 5, + fileId: "1", + chunkSize: 9, + }) readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - readerLock: sync.Mutex{}, fileSize: 20, readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), @@ -164,24 +157,22 @@ func TestReaderAt1(t *testing.T) { } func TestReaderAtGappedChunksDoNotLeak(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 2, - stop: 3, - fileId: "1", - chunkSize: 5, - }, - { - start: 7, - stop: 9, - fileId: "1", - chunkSize: 4, - }, - } + visibles := NewIntervalList[*VisibleInterval]() + addVisibleInterval(visibles, &VisibleInterval{ + start: 2, + stop: 3, + fileId: "1", + chunkSize: 5, + }) + addVisibleInterval(visibles, &VisibleInterval{ + start: 7, + stop: 9, + fileId: "1", + chunkSize: 4, + }) readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - readerLock: sync.Mutex{}, fileSize: 9, readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), @@ -193,8 +184,7 @@ func TestReaderAtGappedChunksDoNotLeak(t *testing.T) { func TestReaderAtSparseFileDoesNotLeak(t *testing.T) { readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals([]VisibleInterval{}, 0, math.MaxInt64), - readerLock: sync.Mutex{}, + chunkViews: ViewFromVisibleIntervals(NewIntervalList[*VisibleInterval](), 0, math.MaxInt64), fileSize: 3, readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index cb89c03c5..0a7c83de7 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -43,7 +43,7 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn } } -func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { +func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { if rc.lookupFileIdFn == nil { return } @@ -55,7 +55,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { return } - for _, chunkView := range chunkViews { + for x := chunkViews; x != nil; x = x.Next { + chunkView := x.Value if _, found := rc.downloaders[chunkView.FileId]; found { continue } @@ -65,7 +66,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { return } - // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset) + // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset) // cache this chunk if not yet cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false) go cacher.startCaching() diff --git a/weed/filer/stream.go b/weed/filer/stream.go index f28341be4..d49784686 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -6,7 +6,6 @@ import ( "golang.org/x/exp/slices" "io" "math" - "sort" "strings" "sync" "time" @@ -78,7 +77,8 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w fileId2Url := make(map[string][]string) - for _, chunkView := range chunkViews { + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value var urlStrings []string var err error for _, backoff := range getLookupFileIdBackoffSchedule { @@ -102,29 +102,30 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) remaining := size - for _, chunkView := range chunkViews { - if offset < chunkView.LogicOffset { - gap := chunkView.LogicOffset - offset + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + if offset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - offset remaining -= gap - glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset) + glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset) err := writeZero(writer, gap) if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset) + return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset) } - offset = chunkView.LogicOffset + offset = chunkView.ViewOffset } urlStrings := fileId2Url[chunkView.FileId] start := time.Now() - err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) - offset += int64(chunkView.Size) - remaining -= int64(chunkView.Size) + err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + offset += int64(chunkView.ViewSize) + remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() return fmt.Errorf("read chunk: %v", err) } stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() - downloadThrottler.MaybeSlowdown(int64(chunkView.Size)) + downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) } if remaining > 0 { glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) @@ -167,14 +168,15 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer idx := 0 - for _, chunkView := range chunkViews { + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value urlStrings, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } - n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset) + n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) if err != nil { return err } @@ -185,7 +187,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView + chunkView *Interval[*ChunkView] totalSize int64 logicOffset int64 buffer []byte @@ -201,17 +203,15 @@ var _ = io.ReaderAt(&ChunkStreamReader{}) func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - slices.SortFunc(chunkViews, func(a, b *ChunkView) bool { - return a.LogicOffset < b.LogicOffset - }) var totalSize int64 - for _, chunk := range chunkViews { - totalSize += int64(chunk.Size) + for x := chunkViews.Front(); x != nil; x = x.Next { + chunk := x.Value + totalSize += int64(chunk.ViewSize) } return &ChunkStreamReader{ - chunkViews: chunkViews, + chunkView: chunkViews.Front(), lookupFileId: lookupFileIdFn, totalSize: totalSize, } @@ -290,7 +290,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { } func insideChunk(offset int64, chunk *ChunkView) bool { - return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) + return chunk.ViewOffset <= offset && offset < chunk.ViewOffset+int64(chunk.ViewSize) } func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { @@ -300,48 +300,22 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { } // fmt.Printf("fetch for offset %d\n", offset) - - // need to seek to a different chunk - currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { - return offset < c.chunkViews[i].LogicOffset - }) - if currentChunkIndex == len(c.chunkViews) { - // not found - if insideChunk(offset, c.chunkViews[0]) { - // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) - currentChunkIndex = 0 - } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) { - currentChunkIndex = len(c.chunkViews) - 1 - // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) - } else { - return io.EOF - } - } else if currentChunkIndex > 0 { - if insideChunk(offset, c.chunkViews[currentChunkIndex]) { - // good hit - } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) { - currentChunkIndex -= 1 - // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) - } else { - // glog.Fatalf("unexpected1 offset %d", offset) - return fmt.Errorf("unexpected1 offset %d", offset) - } - } else { - // glog.Fatalf("unexpected2 offset %d", offset) - return fmt.Errorf("unexpected2 offset %d", offset) + c.chunkView = c.chunkView.Next + if c.chunkView == nil { + return io.EOF } // positioning within the new chunk - chunk := c.chunkViews[currentChunkIndex] + chunk := c.chunkView.Value if insideChunk(offset, chunk) { - if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + if c.isBufferEmpty() || c.bufferOffset != chunk.ViewOffset { if err = c.fetchChunkToBuffer(chunk); err != nil { return } } } else { - // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) - return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) + // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize)) + return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize)) } return } @@ -355,7 +329,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { buffer.Write(data) }) if !shouldRetry { @@ -372,10 +346,10 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return err } c.buffer = buffer.Bytes() - c.bufferOffset = chunkView.LogicOffset + c.bufferOffset = chunkView.ViewOffset c.chunk = chunkView.FileId - // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize)) return nil } |
