aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filechunks.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filechunks.go')
-rw-r--r--weed/filer2/filechunks.go245
1 files changed, 237 insertions, 8 deletions
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index b2f05de3a..0ac7bb43b 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -1,8 +1,13 @@
package filer2
-type Chunks []FileChunk
+import (
+ "math"
+ "sort"
-func (chunks Chunks) TotalSize() (size uint64) {
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
for _, c := range chunks {
t := uint64(c.Offset + int64(c.Size))
if size < t {
@@ -12,12 +17,236 @@ func (chunks Chunks) TotalSize() (size uint64) {
return
}
-func (chunks Chunks) Len() int {
- return len(chunks)
+func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
+
+ visibles := nonOverlappingVisibleIntervals(chunks)
+
+ fileIds := make(map[string]bool)
+ for _, interval := range visibles {
+ fileIds[interval.fileId] = true
+ }
+ for _, chunk := range chunks {
+ if found := fileIds[chunk.FileId]; found {
+ compacted = append(compacted, chunk)
+ } else {
+ garbage = append(garbage, chunk)
+ }
+ }
+
+ return
+}
+
+func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) {
+
+ fileIds := make(map[string]bool)
+ for _, interval := range newChunks {
+ fileIds[interval.FileId] = true
+ }
+ for _, chunk := range oldChunks {
+ if found := fileIds[chunk.FileId]; !found {
+ unused = append(unused, chunk)
+ }
+ }
+
+ return
+}
+
+type ChunkView struct {
+ FileId string
+ Offset int64
+ Size uint64
+ LogicOffset int64
+}
+
+func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) {
+
+ visibles := nonOverlappingVisibleIntervals(chunks)
+
+ stop := offset + int64(size)
+
+ for _, chunk := range visibles {
+ if chunk.start <= offset && offset < chunk.stop && offset < stop {
+ views = append(views, &ChunkView{
+ FileId: chunk.fileId,
+ Offset: offset - chunk.start, // offset is the data starting location in this file id
+ Size: uint64(min(chunk.stop, stop) - offset),
+ LogicOffset: offset,
+ })
+ offset = min(chunk.stop, stop)
+ }
+ }
+
+ return views
+
+}
+
+func logPrintf(name string, visibles []*visibleInterval) {
+ /*
+ log.Printf("%s len %d", name, len(visibles))
+ for _, v := range visibles {
+ log.Printf("%s: => %+v", name, v)
+ }
+ */
+}
+
+func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) {
+
+ sort.Slice(chunks, func(i, j int) bool {
+ if chunks[i].Offset < chunks[j].Offset {
+ return true
+ }
+ if chunks[i].Offset == chunks[j].Offset {
+ return chunks[i].Mtime < chunks[j].Mtime
+ }
+ return false
+ })
+
+ if len(chunks) == 0 {
+ return
+ }
+
+ var parallelIntervals, intervals []*visibleInterval
+ var minStopInterval, upToDateInterval *visibleInterval
+ watermarkStart := chunks[0].Offset
+ for _, chunk := range chunks {
+ // log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
+ logPrintf("parallelIntervals", parallelIntervals)
+ for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset {
+ logPrintf("parallelIntervals loop 1", parallelIntervals)
+ logPrintf("parallelIntervals loop 1 intervals", intervals)
+ minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
+ nextStop := min(minStopInterval.stop, chunk.Offset)
+ intervals = append(intervals, newVisibleInterval(
+ max(watermarkStart, minStopInterval.start),
+ nextStop,
+ upToDateInterval.fileId,
+ upToDateInterval.modifiedTime,
+ ))
+ watermarkStart = nextStop
+ logPrintf("parallelIntervals loop intervals =>", intervals)
+
+ // remove processed intervals, possibly multiple
+ var remaining []*visibleInterval
+ for _, interval := range parallelIntervals {
+ if interval.stop != watermarkStart {
+ remaining = append(remaining, interval)
+ }
+ }
+ parallelIntervals = remaining
+ logPrintf("parallelIntervals loop 2", parallelIntervals)
+ logPrintf("parallelIntervals loop 2 intervals", intervals)
+ }
+ parallelIntervals = append(parallelIntervals, newVisibleInterval(
+ chunk.Offset,
+ chunk.Offset+int64(chunk.Size),
+ chunk.FileId,
+ chunk.Mtime,
+ ))
+ }
+
+ logPrintf("parallelIntervals loop 3", parallelIntervals)
+ logPrintf("parallelIntervals loop 3 intervals", intervals)
+ for len(parallelIntervals) > 0 {
+ minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
+ intervals = append(intervals, newVisibleInterval(
+ max(watermarkStart, minStopInterval.start),
+ minStopInterval.stop,
+ upToDateInterval.fileId,
+ upToDateInterval.modifiedTime,
+ ))
+ watermarkStart = minStopInterval.stop
+
+ // remove processed intervals, possibly multiple
+ var remaining []*visibleInterval
+ for _, interval := range parallelIntervals {
+ if interval.stop != watermarkStart {
+ remaining = append(remaining, interval)
+ }
+ }
+ parallelIntervals = remaining
+ }
+ logPrintf("parallelIntervals loop 4", parallelIntervals)
+ logPrintf("intervals", intervals)
+
+ // merge connected intervals, now the intervals are non-intersecting
+ var lastIntervalIndex int
+ var prevIntervalIndex int
+ for i, interval := range intervals {
+ if i == 0 {
+ prevIntervalIndex = i
+ lastIntervalIndex = i
+ continue
+ }
+ if intervals[i-1].fileId != interval.fileId ||
+ intervals[i-1].stop < intervals[i].start {
+ visibles = append(visibles, newVisibleInterval(
+ intervals[prevIntervalIndex].start,
+ intervals[i-1].stop,
+ intervals[prevIntervalIndex].fileId,
+ intervals[prevIntervalIndex].modifiedTime,
+ ))
+ prevIntervalIndex = i
+ }
+ lastIntervalIndex = i
+ logPrintf("intervals loop 1 visibles", visibles)
+ }
+
+ visibles = append(visibles, newVisibleInterval(
+ intervals[prevIntervalIndex].start,
+ intervals[lastIntervalIndex].stop,
+ intervals[prevIntervalIndex].fileId,
+ intervals[prevIntervalIndex].modifiedTime,
+ ))
+
+ logPrintf("visibles", visibles)
+
+ return
+}
+
+func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) {
+ var latestMtime int64
+ latestIntervalIndex := 0
+ minStop := int64(math.MaxInt64)
+ minIntervalIndex := 0
+ for i, interval := range intervals {
+ if minStop > interval.stop {
+ minIntervalIndex = i
+ minStop = interval.stop
+ }
+ if latestMtime < interval.modifiedTime {
+ latestMtime = interval.modifiedTime
+ latestIntervalIndex = i
+ }
+ }
+ minStopInterval = intervals[minIntervalIndex]
+ upToDateInterval = intervals[latestIntervalIndex]
+ return
}
-func (chunks Chunks) Swap(i, j int) {
- chunks[i], chunks[j] = chunks[j], chunks[i]
+
+// find non-overlapping visible intervals
+// visible interval map to one file chunk
+
+type visibleInterval struct {
+ start int64
+ stop int64
+ modifiedTime int64
+ fileId string
+}
+
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval {
+ return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime}
+}
+
+func min(x, y int64) int64 {
+ if x <= y {
+ return x
+ }
+ return y
}
-func (chunks Chunks) Less(i, j int) bool {
- return chunks[i].Offset < chunks[j].Offset
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
}