diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/filer/filechunks.go | 23 | ||||
| -rw-r--r-- | weed/filer/filechunks_read.go | 119 | ||||
| -rw-r--r-- | weed/filer/filechunks_read_test.go | 119 |
3 files changed, 261 insertions, 0 deletions
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 0dc03f6e2..be18d45ac 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -225,6 +225,12 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset) + visibles2 := readResolvedChunks(chunks) + + if true { + return visibles2, err + } + sort.Slice(chunks, func(i, j int) bool { if chunks[i].Mtime == chunks[j].Mtime { filer_pb.EnsureFid(chunks[i]) @@ -246,9 +252,26 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction } + if len(visibles) != len(visibles2) { + fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2)) + } else { + for i := 0; i < len(visibles); i++ { + checkDifference(visibles[i], visibles2[i]) + } + } + return } +func checkDifference(x, y VisibleInterval) { + if x.start != y.start || + x.stop != y.stop || + x.fileId != y.fileId || + x.modifiedTime != y.modifiedTime { + fmt.Printf("different visible %+v : %+v\n", x, y) + } +} + // find non-overlapping visible intervals // visible interval map to one file chunk diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go new file mode 100644 index 000000000..b39f6a35c --- /dev/null +++ b/weed/filer/filechunks_read.go @@ -0,0 +1,119 @@ +package filer + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "sort" +) + +func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { + + var points []*Point + for _, chunk := range chunks { + points = append(points, &Point{ + x: chunk.Offset, + ts: chunk.Mtime, + chunk: chunk, + isStart: true, + }) + points = append(points, &Point{ + x: chunk.Offset + int64(chunk.Size), + ts: chunk.Mtime, + chunk: chunk, + isStart: false, + }) + } + sort.Slice(points, func(i, j int) bool { + if points[i].x != points[j].x { + return points[i].x < points[j].x + } + if points[i].ts != points[j].ts { + return points[i].ts < points[j].ts + } + if !points[i].isStart { + return true + } + return false + }) + + var prevX int64 + var queue []*Point + for _, point := range points { + if point.isStart { + if len(queue) > 0 { + lastIndex := len(queue) -1 + lastPoint := queue[lastIndex] + if point.x != prevX && lastPoint.ts < point.ts { + visibles = addToVisibles(visibles, prevX, lastPoint, point) + prevX = point.x + } + } + // insert into queue + for i := len(queue); i >= 0; i-- { + if i == 0 || queue[i-1].ts <= point.ts { + if i == len(queue) { + prevX = point.x + } + queue = addToQueue(queue, i, point) + break + } + } + } else { + lastIndex := len(queue) - 1 + index := lastIndex + var startPoint *Point + for ; index >= 0; index-- { + startPoint = queue[index] + if startPoint.ts == point.ts { + queue = removeFromQueue(queue, index) + break + } + } + if index == lastIndex && startPoint != nil { + visibles = addToVisibles(visibles, prevX, startPoint, point) + prevX = point.x + } + } + } + + return +} + +func removeFromQueue(queue []*Point, index int) []*Point { + for i := index; i < len(queue)-1; i++ { + queue[i] = queue[i+1] + } + queue = queue[:len(queue)-1] + return queue +} + +func addToQueue(queue []*Point, index int, point *Point) []*Point { + queue = append(queue, point) + for i := len(queue) - 1; i > index; i-- { + queue[i], queue[i-1] = queue[i-1], queue[i] + } + return queue +} + +func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval { + if prevX < point.x { + chunk := startPoint.chunk + visibles = append(visibles, VisibleInterval{ + start: prevX, + stop: point.x, + fileId: chunk.FileId, + modifiedTime: chunk.Mtime, + chunkOffset: prevX - chunk.Offset, + chunkSize: chunk.Size, + cipherKey: chunk.CipherKey, + isGzipped: chunk.IsCompressed, + }) + } + return visibles +} + +type Point struct { + x int64 + ts int64 + chunk *filer_pb.FileChunk + isStart bool +} diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go new file mode 100644 index 000000000..1920f5185 --- /dev/null +++ b/weed/filer/filechunks_read_test.go @@ -0,0 +1,119 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "math/rand" + "testing" +) + +func TestReadResolvedChunks(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + { + FileId: "a", + Offset: 0, + Size: 100, + Mtime: 1, + }, + { + FileId: "b", + Offset: 50, + Size: 100, + Mtime: 2, + }, + { + FileId: "c", + Offset: 200, + Size: 50, + Mtime: 3, + }, + { + FileId: "d", + Offset: 250, + Size: 50, + Mtime: 4, + }, + { + FileId: "e", + Offset: 175, + Size: 100, + Mtime: 5, + }, + } + + visibles := readResolvedChunks(chunks) + + for _, visible := range visibles { + fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime) + } + +} + +func TestRandomizedReadResolvedChunks(t *testing.T) { + + var limit int64 = 1024*1024 + array := make([]int64, limit) + var chunks []*filer_pb.FileChunk + for ts := int64(0); ts < 1024; ts++ { + x := rand.Int63n(limit) + y := rand.Int63n(limit) + size := x - y + if size < 0 { + size = -size + } + if size > 1024 { + size = 1024 + } + start := x + if start > y { + start = y + } + chunks = append(chunks, randomWrite(array, start, size, ts)) + } + + visibles := readResolvedChunks(chunks) + + for _, visible := range visibles { + for i := visible.start; i<visible.stop;i++{ + if array[i] != visible.modifiedTime { + t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTime) + } + } + } + + // fmt.Printf("visibles %d", len(visibles)) + +} + +func randomWrite(array []int64, start int64, size int64, ts int64) *filer_pb.FileChunk { + for i := start; i < start+size; i++ { + array[i] = ts + } + // fmt.Printf("write [%d,%d) %d\n", start, start+size, ts) + return &filer_pb.FileChunk{ + FileId: "", + Offset: start, + Size: uint64(size), + Mtime: ts, + } +} + +func TestSequentialReadResolvedChunks(t *testing.T) { + + var chunkSize int64 = 1024*1024*2 + var chunks []*filer_pb.FileChunk + for ts := int64(0); ts < 13; ts++ { + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: "", + Offset: chunkSize*ts, + Size: uint64(chunkSize), + Mtime: 1, + }) + } + + visibles := readResolvedChunks(chunks) + + fmt.Printf("visibles %d", len(visibles)) + +} |
