diff options
Diffstat (limited to 'weed/filer2/filechunks.go')
| -rw-r--r-- | weed/filer2/filechunks.go | 228 |
1 files changed, 91 insertions, 137 deletions
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index b248a8c59..711488df1 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -3,8 +3,8 @@ package filer2 import ( "fmt" "hash/fnv" - "math" "sort" + "sync" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) @@ -33,14 +33,14 @@ func ETag(chunks []*filer_pb.FileChunk) (etag string) { func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles := nonOverlappingVisibleIntervals(chunks) + visibles := NonOverlappingVisibleIntervals(chunks) fileIds := make(map[string]bool) for _, interval := range visibles { fileIds[interval.fileId] = true } for _, chunk := range chunks { - if found := fileIds[chunk.FileId]; found { + if _, found := fileIds[chunk.GetFileIdString()]; found { compacted = append(compacted, chunk) } else { garbage = append(garbage, chunk) @@ -50,15 +50,15 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file return } -func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) { +func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { fileIds := make(map[string]bool) - for _, interval := range newChunks { - fileIds[interval.FileId] = true + for _, interval := range bs { + fileIds[interval.GetFileIdString()] = true } - for _, chunk := range oldChunks { - if found := fileIds[chunk.FileId]; !found { - unused = append(unused, chunk) + for _, chunk := range as { + if _, found := fileIds[chunk.GetFileIdString()]; !found { + delta = append(delta, chunk) } } @@ -70,21 +70,35 @@ type ChunkView struct { Offset int64 Size uint64 LogicOffset int64 + IsFullChunk bool + CipherKey []byte + isGzipped bool } func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { - visibles := nonOverlappingVisibleIntervals(chunks) + visibles := NonOverlappingVisibleIntervals(chunks) + + return ViewFromVisibleIntervals(visibles, offset, size) + +} + +func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int) (views []*ChunkView) { stop := offset + int64(size) for _, chunk := range visibles { + if chunk.start <= offset && offset < chunk.stop && offset < stop { + isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop views = append(views, &ChunkView{ FileId: chunk.fileId, Offset: offset - chunk.start, // offset is the data starting location in this file id Size: uint64(min(chunk.stop, stop) - offset), LogicOffset: offset, + IsFullChunk: isFullChunk, + CipherKey: chunk.cipherKey, + isGzipped: chunk.isGzipped, }) offset = min(chunk.stop, stop) } @@ -94,7 +108,7 @@ func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views } -func logPrintf(name string, visibles []*visibleInterval) { +func logPrintf(name string, visibles []VisibleInterval) { /* log.Printf("%s len %d", name, len(visibles)) for _, v := range visibles { @@ -103,152 +117,99 @@ func logPrintf(name string, visibles []*visibleInterval) { */ } -func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) { +var bufPool = sync.Pool{ + New: func() interface{} { + return new(VisibleInterval) + }, +} - sort.Slice(chunks, func(i, j int) bool { - if chunks[i].Offset < chunks[j].Offset { - return true - } - if chunks[i].Offset == chunks[j].Offset { - return chunks[i].Mtime < chunks[j].Mtime - } - return false - }) +func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { - if len(chunks) == 0 { - return - } + newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped) - var parallelIntervals, intervals []*visibleInterval - var minStopInterval, upToDateInterval *visibleInterval - watermarkStart := chunks[0].Offset - for _, chunk := range chunks { - // log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) - logPrintf("parallelIntervals", parallelIntervals) - for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset { - logPrintf("parallelIntervals loop 1", parallelIntervals) - logPrintf("parallelIntervals loop 1 intervals", intervals) - minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals) - nextStop := min(minStopInterval.stop, chunk.Offset) - intervals = append(intervals, newVisibleInterval( - max(watermarkStart, minStopInterval.start), - nextStop, - upToDateInterval.fileId, - upToDateInterval.modifiedTime, - )) - watermarkStart = nextStop - logPrintf("parallelIntervals loop intervals =>", intervals) - - // remove processed intervals, possibly multiple - var remaining []*visibleInterval - for _, interval := range parallelIntervals { - if interval.stop != watermarkStart { - remaining = append(remaining, interval) - } - } - parallelIntervals = remaining - logPrintf("parallelIntervals loop 2", parallelIntervals) - logPrintf("parallelIntervals loop 2 intervals", intervals) - } - parallelIntervals = append(parallelIntervals, newVisibleInterval( - chunk.Offset, - chunk.Offset+int64(chunk.Size), - chunk.FileId, - chunk.Mtime, - )) + length := len(visibles) + if length == 0 { + return append(visibles, newV) + } + last := visibles[length-1] + if last.stop <= chunk.Offset { + return append(visibles, newV) } - logPrintf("parallelIntervals loop 3", parallelIntervals) - logPrintf("parallelIntervals loop 3 intervals", intervals) - for len(parallelIntervals) > 0 { - minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals) - intervals = append(intervals, newVisibleInterval( - max(watermarkStart, minStopInterval.start), - minStopInterval.stop, - upToDateInterval.fileId, - upToDateInterval.modifiedTime, - )) - watermarkStart = minStopInterval.stop - - // remove processed intervals, possibly multiple - var remaining []*visibleInterval - for _, interval := range parallelIntervals { - if interval.stop != watermarkStart { - remaining = append(remaining, interval) - } + logPrintf(" before", visibles) + for _, v := range visibles { + if v.start < chunk.Offset && chunk.Offset < v.stop { + newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped)) } - parallelIntervals = remaining - } - logPrintf("parallelIntervals loop 4", parallelIntervals) - logPrintf("intervals", intervals) - - // merge connected intervals, now the intervals are non-intersecting - var lastIntervalIndex int - var prevIntervalIndex int - for i, interval := range intervals { - if i == 0 { - prevIntervalIndex = i - lastIntervalIndex = i - continue + chunkStop := chunk.Offset + int64(chunk.Size) + if v.start < chunkStop && chunkStop < v.stop { + newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped)) } - if intervals[i-1].fileId != interval.fileId || - intervals[i-1].stop < intervals[i].start { - visibles = append(visibles, newVisibleInterval( - intervals[prevIntervalIndex].start, - intervals[i-1].stop, - intervals[prevIntervalIndex].fileId, - intervals[prevIntervalIndex].modifiedTime, - )) - prevIntervalIndex = i + if chunkStop <= v.start || v.stop <= chunk.Offset { + newVisibles = append(newVisibles, v) } - lastIntervalIndex = i - logPrintf("intervals loop 1 visibles", visibles) } + newVisibles = append(newVisibles, newV) - visibles = append(visibles, newVisibleInterval( - intervals[prevIntervalIndex].start, - intervals[lastIntervalIndex].stop, - intervals[prevIntervalIndex].fileId, - intervals[prevIntervalIndex].modifiedTime, - )) + logPrintf(" append", newVisibles) - logPrintf("visibles", visibles) + for i := len(newVisibles) - 1; i >= 0; i-- { + if i > 0 && newV.start < newVisibles[i-1].start { + newVisibles[i] = newVisibles[i-1] + } else { + newVisibles[i] = newV + break + } + } + logPrintf(" sorted", newVisibles) - return + return newVisibles } -func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) { - var latestMtime int64 - latestIntervalIndex := 0 - minStop := int64(math.MaxInt64) - minIntervalIndex := 0 - for i, interval := range intervals { - if minStop > interval.stop { - minIntervalIndex = i - minStop = interval.stop - } - if latestMtime < interval.modifiedTime { - latestMtime = interval.modifiedTime - latestIntervalIndex = i - } +func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { + + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].Mtime < chunks[j].Mtime + }) + + var newVisibles []VisibleInterval + for _, chunk := range chunks { + + newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk) + t := visibles[:0] + visibles = newVisibles + newVisibles = t + + logPrintf("add", visibles) + } - minStopInterval = intervals[minIntervalIndex] - upToDateInterval = intervals[latestIntervalIndex] + return } // find non-overlapping visible intervals // visible interval map to one file chunk -type visibleInterval struct { +type VisibleInterval struct { start int64 stop int64 modifiedTime int64 fileId string + isFullChunk bool + cipherKey []byte + isGzipped bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval { - return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime} +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval { + return VisibleInterval{ + start: start, + stop: stop, + fileId: fileId, + modifiedTime: modifiedTime, + isFullChunk: isFullChunk, + cipherKey: cipherKey, + isGzipped: isGzipped, + } } func min(x, y int64) int64 { @@ -257,10 +218,3 @@ func min(x, y int64) int64 { } return y } - -func max(x, y int64) int64 { - if x > y { - return x - } - return y -} |
