aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filechunks.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filechunks.go')
-rw-r--r--weed/filer2/filechunks.go228
1 files changed, 91 insertions, 137 deletions
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index b248a8c59..711488df1 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -3,8 +3,8 @@ package filer2
import (
"fmt"
"hash/fnv"
- "math"
"sort"
+ "sync"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
@@ -33,14 +33,14 @@ func ETag(chunks []*filer_pb.FileChunk) (etag string) {
func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
- visibles := nonOverlappingVisibleIntervals(chunks)
+ visibles := NonOverlappingVisibleIntervals(chunks)
fileIds := make(map[string]bool)
for _, interval := range visibles {
fileIds[interval.fileId] = true
}
for _, chunk := range chunks {
- if found := fileIds[chunk.FileId]; found {
+ if _, found := fileIds[chunk.GetFileIdString()]; found {
compacted = append(compacted, chunk)
} else {
garbage = append(garbage, chunk)
@@ -50,15 +50,15 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
return
}
-func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) {
+func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
fileIds := make(map[string]bool)
- for _, interval := range newChunks {
- fileIds[interval.FileId] = true
+ for _, interval := range bs {
+ fileIds[interval.GetFileIdString()] = true
}
- for _, chunk := range oldChunks {
- if found := fileIds[chunk.FileId]; !found {
- unused = append(unused, chunk)
+ for _, chunk := range as {
+ if _, found := fileIds[chunk.GetFileIdString()]; !found {
+ delta = append(delta, chunk)
}
}
@@ -70,21 +70,35 @@ type ChunkView struct {
Offset int64
Size uint64
LogicOffset int64
+ IsFullChunk bool
+ CipherKey []byte
+ isGzipped bool
}
func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) {
- visibles := nonOverlappingVisibleIntervals(chunks)
+ visibles := NonOverlappingVisibleIntervals(chunks)
+
+ return ViewFromVisibleIntervals(visibles, offset, size)
+
+}
+
+func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int) (views []*ChunkView) {
stop := offset + int64(size)
for _, chunk := range visibles {
+
if chunk.start <= offset && offset < chunk.stop && offset < stop {
+ isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
views = append(views, &ChunkView{
FileId: chunk.fileId,
Offset: offset - chunk.start, // offset is the data starting location in this file id
Size: uint64(min(chunk.stop, stop) - offset),
LogicOffset: offset,
+ IsFullChunk: isFullChunk,
+ CipherKey: chunk.cipherKey,
+ isGzipped: chunk.isGzipped,
})
offset = min(chunk.stop, stop)
}
@@ -94,7 +108,7 @@ func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views
}
-func logPrintf(name string, visibles []*visibleInterval) {
+func logPrintf(name string, visibles []VisibleInterval) {
/*
log.Printf("%s len %d", name, len(visibles))
for _, v := range visibles {
@@ -103,152 +117,99 @@ func logPrintf(name string, visibles []*visibleInterval) {
*/
}
-func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) {
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return new(VisibleInterval)
+ },
+}
- sort.Slice(chunks, func(i, j int) bool {
- if chunks[i].Offset < chunks[j].Offset {
- return true
- }
- if chunks[i].Offset == chunks[j].Offset {
- return chunks[i].Mtime < chunks[j].Mtime
- }
- return false
- })
+func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
- if len(chunks) == 0 {
- return
- }
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped)
- var parallelIntervals, intervals []*visibleInterval
- var minStopInterval, upToDateInterval *visibleInterval
- watermarkStart := chunks[0].Offset
- for _, chunk := range chunks {
- // log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
- logPrintf("parallelIntervals", parallelIntervals)
- for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset {
- logPrintf("parallelIntervals loop 1", parallelIntervals)
- logPrintf("parallelIntervals loop 1 intervals", intervals)
- minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
- nextStop := min(minStopInterval.stop, chunk.Offset)
- intervals = append(intervals, newVisibleInterval(
- max(watermarkStart, minStopInterval.start),
- nextStop,
- upToDateInterval.fileId,
- upToDateInterval.modifiedTime,
- ))
- watermarkStart = nextStop
- logPrintf("parallelIntervals loop intervals =>", intervals)
-
- // remove processed intervals, possibly multiple
- var remaining []*visibleInterval
- for _, interval := range parallelIntervals {
- if interval.stop != watermarkStart {
- remaining = append(remaining, interval)
- }
- }
- parallelIntervals = remaining
- logPrintf("parallelIntervals loop 2", parallelIntervals)
- logPrintf("parallelIntervals loop 2 intervals", intervals)
- }
- parallelIntervals = append(parallelIntervals, newVisibleInterval(
- chunk.Offset,
- chunk.Offset+int64(chunk.Size),
- chunk.FileId,
- chunk.Mtime,
- ))
+ length := len(visibles)
+ if length == 0 {
+ return append(visibles, newV)
+ }
+ last := visibles[length-1]
+ if last.stop <= chunk.Offset {
+ return append(visibles, newV)
}
- logPrintf("parallelIntervals loop 3", parallelIntervals)
- logPrintf("parallelIntervals loop 3 intervals", intervals)
- for len(parallelIntervals) > 0 {
- minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
- intervals = append(intervals, newVisibleInterval(
- max(watermarkStart, minStopInterval.start),
- minStopInterval.stop,
- upToDateInterval.fileId,
- upToDateInterval.modifiedTime,
- ))
- watermarkStart = minStopInterval.stop
-
- // remove processed intervals, possibly multiple
- var remaining []*visibleInterval
- for _, interval := range parallelIntervals {
- if interval.stop != watermarkStart {
- remaining = append(remaining, interval)
- }
+ logPrintf(" before", visibles)
+ for _, v := range visibles {
+ if v.start < chunk.Offset && chunk.Offset < v.stop {
+ newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
}
- parallelIntervals = remaining
- }
- logPrintf("parallelIntervals loop 4", parallelIntervals)
- logPrintf("intervals", intervals)
-
- // merge connected intervals, now the intervals are non-intersecting
- var lastIntervalIndex int
- var prevIntervalIndex int
- for i, interval := range intervals {
- if i == 0 {
- prevIntervalIndex = i
- lastIntervalIndex = i
- continue
+ chunkStop := chunk.Offset + int64(chunk.Size)
+ if v.start < chunkStop && chunkStop < v.stop {
+ newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
}
- if intervals[i-1].fileId != interval.fileId ||
- intervals[i-1].stop < intervals[i].start {
- visibles = append(visibles, newVisibleInterval(
- intervals[prevIntervalIndex].start,
- intervals[i-1].stop,
- intervals[prevIntervalIndex].fileId,
- intervals[prevIntervalIndex].modifiedTime,
- ))
- prevIntervalIndex = i
+ if chunkStop <= v.start || v.stop <= chunk.Offset {
+ newVisibles = append(newVisibles, v)
}
- lastIntervalIndex = i
- logPrintf("intervals loop 1 visibles", visibles)
}
+ newVisibles = append(newVisibles, newV)
- visibles = append(visibles, newVisibleInterval(
- intervals[prevIntervalIndex].start,
- intervals[lastIntervalIndex].stop,
- intervals[prevIntervalIndex].fileId,
- intervals[prevIntervalIndex].modifiedTime,
- ))
+ logPrintf(" append", newVisibles)
- logPrintf("visibles", visibles)
+ for i := len(newVisibles) - 1; i >= 0; i-- {
+ if i > 0 && newV.start < newVisibles[i-1].start {
+ newVisibles[i] = newVisibles[i-1]
+ } else {
+ newVisibles[i] = newV
+ break
+ }
+ }
+ logPrintf(" sorted", newVisibles)
- return
+ return newVisibles
}
-func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) {
- var latestMtime int64
- latestIntervalIndex := 0
- minStop := int64(math.MaxInt64)
- minIntervalIndex := 0
- for i, interval := range intervals {
- if minStop > interval.stop {
- minIntervalIndex = i
- minStop = interval.stop
- }
- if latestMtime < interval.modifiedTime {
- latestMtime = interval.modifiedTime
- latestIntervalIndex = i
- }
+func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
+
+ sort.Slice(chunks, func(i, j int) bool {
+ return chunks[i].Mtime < chunks[j].Mtime
+ })
+
+ var newVisibles []VisibleInterval
+ for _, chunk := range chunks {
+
+ newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk)
+ t := visibles[:0]
+ visibles = newVisibles
+ newVisibles = t
+
+ logPrintf("add", visibles)
+
}
- minStopInterval = intervals[minIntervalIndex]
- upToDateInterval = intervals[latestIntervalIndex]
+
return
}
// find non-overlapping visible intervals
// visible interval map to one file chunk
-type visibleInterval struct {
+type VisibleInterval struct {
start int64
stop int64
modifiedTime int64
fileId string
+ isFullChunk bool
+ cipherKey []byte
+ isGzipped bool
}
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval {
- return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime}
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval {
+ return VisibleInterval{
+ start: start,
+ stop: stop,
+ fileId: fileId,
+ modifiedTime: modifiedTime,
+ isFullChunk: isFullChunk,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ }
}
func min(x, y int64) int64 {
@@ -257,10 +218,3 @@ func min(x, y int64) int64 {
}
return y
}
-
-func max(x, y int64) int64 {
- if x > y {
- return x
- }
- return y
-}