aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/filechunks.go23
-rw-r--r--weed/filer/filechunks_read.go119
-rw-r--r--weed/filer/filechunks_read_test.go119
3 files changed, 261 insertions, 0 deletions
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 0dc03f6e2..be18d45ac 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -225,6 +225,12 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
+ visibles2 := readResolvedChunks(chunks)
+
+ if true {
+ return visibles2, err
+ }
+
sort.Slice(chunks, func(i, j int) bool {
if chunks[i].Mtime == chunks[j].Mtime {
filer_pb.EnsureFid(chunks[i])
@@ -246,9 +252,26 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction
}
+ if len(visibles) != len(visibles2) {
+ fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2))
+ } else {
+ for i := 0; i < len(visibles); i++ {
+ checkDifference(visibles[i], visibles2[i])
+ }
+ }
+
return
}
+func checkDifference(x, y VisibleInterval) {
+ if x.start != y.start ||
+ x.stop != y.stop ||
+ x.fileId != y.fileId ||
+ x.modifiedTime != y.modifiedTime {
+ fmt.Printf("different visible %+v : %+v\n", x, y)
+ }
+}
+
// find non-overlapping visible intervals
// visible interval map to one file chunk
diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go
new file mode 100644
index 000000000..b39f6a35c
--- /dev/null
+++ b/weed/filer/filechunks_read.go
@@ -0,0 +1,119 @@
+package filer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "sort"
+)
+
+func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
+
+ var points []*Point
+ for _, chunk := range chunks {
+ points = append(points, &Point{
+ x: chunk.Offset,
+ ts: chunk.Mtime,
+ chunk: chunk,
+ isStart: true,
+ })
+ points = append(points, &Point{
+ x: chunk.Offset + int64(chunk.Size),
+ ts: chunk.Mtime,
+ chunk: chunk,
+ isStart: false,
+ })
+ }
+ sort.Slice(points, func(i, j int) bool {
+ if points[i].x != points[j].x {
+ return points[i].x < points[j].x
+ }
+ if points[i].ts != points[j].ts {
+ return points[i].ts < points[j].ts
+ }
+ if !points[i].isStart {
+ return true
+ }
+ return false
+ })
+
+ var prevX int64
+ var queue []*Point
+ for _, point := range points {
+ if point.isStart {
+ if len(queue) > 0 {
+ lastIndex := len(queue) -1
+ lastPoint := queue[lastIndex]
+ if point.x != prevX && lastPoint.ts < point.ts {
+ visibles = addToVisibles(visibles, prevX, lastPoint, point)
+ prevX = point.x
+ }
+ }
+ // insert into queue
+ for i := len(queue); i >= 0; i-- {
+ if i == 0 || queue[i-1].ts <= point.ts {
+ if i == len(queue) {
+ prevX = point.x
+ }
+ queue = addToQueue(queue, i, point)
+ break
+ }
+ }
+ } else {
+ lastIndex := len(queue) - 1
+ index := lastIndex
+ var startPoint *Point
+ for ; index >= 0; index-- {
+ startPoint = queue[index]
+ if startPoint.ts == point.ts {
+ queue = removeFromQueue(queue, index)
+ break
+ }
+ }
+ if index == lastIndex && startPoint != nil {
+ visibles = addToVisibles(visibles, prevX, startPoint, point)
+ prevX = point.x
+ }
+ }
+ }
+
+ return
+}
+
+func removeFromQueue(queue []*Point, index int) []*Point {
+ for i := index; i < len(queue)-1; i++ {
+ queue[i] = queue[i+1]
+ }
+ queue = queue[:len(queue)-1]
+ return queue
+}
+
+func addToQueue(queue []*Point, index int, point *Point) []*Point {
+ queue = append(queue, point)
+ for i := len(queue) - 1; i > index; i-- {
+ queue[i], queue[i-1] = queue[i-1], queue[i]
+ }
+ return queue
+}
+
+func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval {
+ if prevX < point.x {
+ chunk := startPoint.chunk
+ visibles = append(visibles, VisibleInterval{
+ start: prevX,
+ stop: point.x,
+ fileId: chunk.FileId,
+ modifiedTime: chunk.Mtime,
+ chunkOffset: prevX - chunk.Offset,
+ chunkSize: chunk.Size,
+ cipherKey: chunk.CipherKey,
+ isGzipped: chunk.IsCompressed,
+ })
+ }
+ return visibles
+}
+
+type Point struct {
+ x int64
+ ts int64
+ chunk *filer_pb.FileChunk
+ isStart bool
+}
diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go
new file mode 100644
index 000000000..1920f5185
--- /dev/null
+++ b/weed/filer/filechunks_read_test.go
@@ -0,0 +1,119 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "math/rand"
+ "testing"
+)
+
+func TestReadResolvedChunks(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {
+ FileId: "a",
+ Offset: 0,
+ Size: 100,
+ Mtime: 1,
+ },
+ {
+ FileId: "b",
+ Offset: 50,
+ Size: 100,
+ Mtime: 2,
+ },
+ {
+ FileId: "c",
+ Offset: 200,
+ Size: 50,
+ Mtime: 3,
+ },
+ {
+ FileId: "d",
+ Offset: 250,
+ Size: 50,
+ Mtime: 4,
+ },
+ {
+ FileId: "e",
+ Offset: 175,
+ Size: 100,
+ Mtime: 5,
+ },
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ for _, visible := range visibles {
+ fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime)
+ }
+
+}
+
+func TestRandomizedReadResolvedChunks(t *testing.T) {
+
+ var limit int64 = 1024*1024
+ array := make([]int64, limit)
+ var chunks []*filer_pb.FileChunk
+ for ts := int64(0); ts < 1024; ts++ {
+ x := rand.Int63n(limit)
+ y := rand.Int63n(limit)
+ size := x - y
+ if size < 0 {
+ size = -size
+ }
+ if size > 1024 {
+ size = 1024
+ }
+ start := x
+ if start > y {
+ start = y
+ }
+ chunks = append(chunks, randomWrite(array, start, size, ts))
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ for _, visible := range visibles {
+ for i := visible.start; i<visible.stop;i++{
+ if array[i] != visible.modifiedTime {
+ t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTime)
+ }
+ }
+ }
+
+ // fmt.Printf("visibles %d", len(visibles))
+
+}
+
+func randomWrite(array []int64, start int64, size int64, ts int64) *filer_pb.FileChunk {
+ for i := start; i < start+size; i++ {
+ array[i] = ts
+ }
+ // fmt.Printf("write [%d,%d) %d\n", start, start+size, ts)
+ return &filer_pb.FileChunk{
+ FileId: "",
+ Offset: start,
+ Size: uint64(size),
+ Mtime: ts,
+ }
+}
+
+func TestSequentialReadResolvedChunks(t *testing.T) {
+
+ var chunkSize int64 = 1024*1024*2
+ var chunks []*filer_pb.FileChunk
+ for ts := int64(0); ts < 13; ts++ {
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: "",
+ Offset: chunkSize*ts,
+ Size: uint64(chunkSize),
+ Mtime: 1,
+ })
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ fmt.Printf("visibles %d", len(visibles))
+
+}