aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-16 16:03:16 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-16 16:03:16 -0700
commit73369906390467bb2e069a7db542801294507fc5 (patch)
tree17239b3f7b3e7c04f3c6c3d50fc663002793a752
parent8965a53c4dff019f719b9818800e78c6e7c4648a (diff)
downloadseaweedfs-73369906390467bb2e069a7db542801294507fc5.tar.xz
seaweedfs-73369906390467bb2e069a7db542801294507fc5.zip
faster file read for large files
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ReadChunks.java109
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java90
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java123
-rw-r--r--weed/filer/filechunks.go23
-rw-r--r--weed/filer/filechunks_read.go119
-rw-r--r--weed/filer/filechunks_read_test.go119
6 files changed, 484 insertions, 99 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java
new file mode 100644
index 000000000..2eba4f808
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java
@@ -0,0 +1,109 @@
+package seaweedfs.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class ReadChunks {
+
+ public static List<SeaweedRead.VisibleInterval> readResolvedChunks(List<FilerProto.FileChunk> chunkList) throws IOException {
+ List<Point> points = new ArrayList<>(chunkList.size() * 2);
+ for (FilerProto.FileChunk chunk : chunkList) {
+ points.add(new Point(chunk.getOffset(), chunk, true));
+ points.add(new Point(chunk.getOffset() + chunk.getSize(), chunk, false));
+ }
+ Collections.sort(points, new Comparator<Point>() {
+ @Override
+ public int compare(Point a, Point b) {
+ int x = (int) (a.x - b.x);
+ if (a.x != b.x) {
+ return (int) (a.x - b.x);
+ }
+ if (a.ts != b.ts) {
+ return (int) (a.ts - b.ts);
+ }
+ if (!a.isStart) {
+ return -1;
+ }
+ return 1;
+ }
+ });
+
+ long prevX = 0;
+ List<SeaweedRead.VisibleInterval> visibles = new ArrayList<>();
+ ArrayList<Point> queue = new ArrayList<>();
+ for (Point point : points) {
+ if (point.isStart) {
+ if (queue.size() > 0) {
+ int lastIndex = queue.size() - 1;
+ Point lastPoint = queue.get(lastIndex);
+ if (point.x != prevX && lastPoint.ts < point.ts) {
+ addToVisibles(visibles, prevX, lastPoint, point);
+ prevX = point.x;
+ }
+ }
+ // insert into queue
+ for (int i = queue.size(); i >= 0; i--) {
+ if (i == 0 || queue.get(i - 1).ts <= point.ts) {
+ if (i == queue.size()) {
+ prevX = point.x;
+ }
+ queue.add(i, point);
+ break;
+ }
+ }
+ } else {
+ int lastIndex = queue.size() - 1;
+ int index = lastIndex;
+ Point startPoint = null;
+ for (; index >= 0; index--) {
+ startPoint = queue.get(index);
+ if (startPoint.ts == point.ts) {
+ queue.remove(index);
+ break;
+ }
+ }
+ if (index == lastIndex && startPoint != null) {
+ addToVisibles(visibles, prevX, startPoint, point);
+ prevX = point.x;
+ }
+ }
+ }
+
+ return visibles;
+
+ }
+
+ private static void addToVisibles(List<SeaweedRead.VisibleInterval> visibles, long prevX, Point startPoint, Point point) {
+ if (prevX < point.x) {
+ FilerProto.FileChunk chunk = startPoint.chunk;
+ visibles.add(new SeaweedRead.VisibleInterval(
+ prevX,
+ point.x,
+ chunk.getFileId(),
+ chunk.getMtime(),
+ prevX - chunk.getOffset(),
+ chunk.getOffset() == prevX && chunk.getSize() == prevX - startPoint.x,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed()
+ ));
+ }
+ }
+
+ static class Point {
+ long x;
+ long ts;
+ FilerProto.FileChunk chunk;
+ boolean isStart;
+
+ public Point(long x, FilerProto.FileChunk chunk, boolean isStart) {
+ this.x = x;
+ this.ts = chunk.getMtime();
+ this.chunk = chunk;
+ this.isStart = isStart;
+ }
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 011462a17..41033befb 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -226,96 +226,8 @@ public class SeaweedRead {
chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList);
- FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
- Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
- @Override
- public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- // if just a.getMtime() - b.getMtime(), it will overflow!
- if (a.getMtime() < b.getMtime()) {
- return -1;
- } else if (a.getMtime() > b.getMtime()) {
- return 1;
- }
- return 0;
- }
- });
-
- List<VisibleInterval> visibles = new ArrayList<>();
- for (FilerProto.FileChunk chunk : chunks) {
- List<VisibleInterval> newVisibles = new ArrayList<>();
- visibles = mergeIntoVisibles(visibles, newVisibles, chunk);
- }
-
- return visibles;
- }
-
- private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles,
- List<VisibleInterval> newVisibles,
- FilerProto.FileChunk chunk) {
- VisibleInterval newV = new VisibleInterval(
- chunk.getOffset(),
- chunk.getOffset() + chunk.getSize(),
- chunk.getFileId(),
- chunk.getMtime(),
- 0,
- true,
- chunk.getCipherKey().toByteArray(),
- chunk.getIsCompressed()
- );
-
- // easy cases to speed up
- if (visibles.size() == 0) {
- visibles.add(newV);
- return visibles;
- }
- if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) {
- visibles.add(newV);
- return visibles;
- }
-
- for (VisibleInterval v : visibles) {
- if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
- newVisibles.add(new VisibleInterval(
- v.start,
- chunk.getOffset(),
- v.fileId,
- v.modifiedTime,
- v.chunkOffset,
- false,
- v.cipherKey,
- v.isCompressed
- ));
- }
- long chunkStop = chunk.getOffset() + chunk.getSize();
- if (v.start < chunkStop && chunkStop < v.stop) {
- newVisibles.add(new VisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- v.chunkOffset + (chunkStop - v.start),
- false,
- v.cipherKey,
- v.isCompressed
- ));
- }
- if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
- newVisibles.add(v);
- }
- }
- newVisibles.add(newV);
-
- // keep everything sorted
- for (int i = newVisibles.size() - 1; i >= 0; i--) {
- if (i > 0 && newV.start < newVisibles.get(i - 1).start) {
- newVisibles.set(i, newVisibles.get(i - 1));
- } else {
- newVisibles.set(i, newV);
- break;
- }
- }
+ return ReadChunks.readResolvedChunks(chunkList);
- return newVisibles;
}
public static String parseVolumeId(String fileId) {
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
index 44b833c90..6ad9edb2c 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -6,6 +6,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
public class SeaweedReadTest {
@@ -13,17 +14,17 @@ public class SeaweedReadTest {
public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
- .setFileId("aaa")
- .setOffset(0)
- .setSize(100)
- .setMtime(1000)
- .build());
+ .setFileId("aaa")
+ .setOffset(0)
+ .setSize(100)
+ .setMtime(1000)
+ .build());
chunks.add(FilerProto.FileChunk.newBuilder()
- .setFileId("bbb")
- .setOffset(100)
- .setSize(133)
- .setMtime(2000)
- .build());
+ .setFileId("bbb")
+ .setOffset(100)
+ .setSize(133)
+ .setMtime(2000)
+ .build());
List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
@@ -61,4 +62,106 @@ public class SeaweedReadTest {
}
+
+ @Test
+ public void testReadResolvedChunks() throws IOException {
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("a")
+ .setOffset(0)
+ .setSize(100)
+ .setMtime(1)
+ .build());
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("b")
+ .setOffset(50)
+ .setSize(100)
+ .setMtime(2)
+ .build());
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("c")
+ .setOffset(200)
+ .setSize(50)
+ .setMtime(3)
+ .build());
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("d")
+ .setOffset(250)
+ .setSize(50)
+ .setMtime(4)
+ .build());
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("e")
+ .setOffset(175)
+ .setSize(100)
+ .setMtime(5)
+ .build());
+
+ List<SeaweedRead.VisibleInterval> visibleIntervals = ReadChunks.readResolvedChunks(chunks);
+ for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
+ System.out.println("visible:" + visibleInterval);
+ }
+
+ Assert.assertEquals(4, visibleIntervals.size());
+
+ SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0);
+ Assert.assertEquals(visibleInterval.start, 0);
+ Assert.assertEquals(visibleInterval.stop, 50);
+ Assert.assertEquals(visibleInterval.modifiedTime, 1);
+ Assert.assertEquals(visibleInterval.fileId, "a");
+
+ visibleInterval = visibleIntervals.get(1);
+ Assert.assertEquals(visibleInterval.start, 50);
+ Assert.assertEquals(visibleInterval.stop, 150);
+ Assert.assertEquals(visibleInterval.modifiedTime, 2);
+ Assert.assertEquals(visibleInterval.fileId, "b");
+
+ visibleInterval = visibleIntervals.get(2);
+ Assert.assertEquals(visibleInterval.start, 175);
+ Assert.assertEquals(visibleInterval.stop, 275);
+ Assert.assertEquals(visibleInterval.modifiedTime, 5);
+ Assert.assertEquals(visibleInterval.fileId, "e");
+
+ visibleInterval = visibleIntervals.get(3);
+ Assert.assertEquals(visibleInterval.start, 275);
+ Assert.assertEquals(visibleInterval.stop, 300);
+ Assert.assertEquals(visibleInterval.modifiedTime, 4);
+ Assert.assertEquals(visibleInterval.fileId, "d");
+
+ }
+
+
+ @Test
+ public void testRandomizedReadResolvedChunks() throws IOException {
+ Random random = new Random();
+ int limit = 1024*1024;
+ long[] array = new long[limit];
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+ for (long ts=0;ts<1024;ts++){
+ int x = random.nextInt(limit);
+ int y = random.nextInt(limit);
+ int size = Math.min(Math.abs(x-y), 1024);
+ chunks.add(randomWrite(array, Math.min(x,y), size, ts));
+ }
+
+ List<SeaweedRead.VisibleInterval> visibleIntervals = ReadChunks.readResolvedChunks(chunks);
+ for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
+ System.out.println("visible:" + visibleInterval);
+ for (int i = (int) visibleInterval.start; i<visibleInterval.stop; i++) {
+ Assert.assertEquals(array[i], visibleInterval.modifiedTime);
+ }
+ }
+
+ }
+ private FilerProto.FileChunk randomWrite(long[] array, int start, int size, long ts) {
+ for (int i=start;i<start+size;i++) {
+ array[i] = ts;
+ }
+ return FilerProto.FileChunk.newBuilder()
+ .setFileId("")
+ .setOffset(start)
+ .setSize(size)
+ .setMtime(ts)
+ .build();
+ }
}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 0dc03f6e2..be18d45ac 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -225,6 +225,12 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
+ visibles2 := readResolvedChunks(chunks)
+
+ if true {
+ return visibles2, err
+ }
+
sort.Slice(chunks, func(i, j int) bool {
if chunks[i].Mtime == chunks[j].Mtime {
filer_pb.EnsureFid(chunks[i])
@@ -246,9 +252,26 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction
}
+ if len(visibles) != len(visibles2) {
+ fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2))
+ } else {
+ for i := 0; i < len(visibles); i++ {
+ checkDifference(visibles[i], visibles2[i])
+ }
+ }
+
return
}
+func checkDifference(x, y VisibleInterval) {
+ if x.start != y.start ||
+ x.stop != y.stop ||
+ x.fileId != y.fileId ||
+ x.modifiedTime != y.modifiedTime {
+ fmt.Printf("different visible %+v : %+v\n", x, y)
+ }
+}
+
// find non-overlapping visible intervals
// visible interval map to one file chunk
diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go
new file mode 100644
index 000000000..b39f6a35c
--- /dev/null
+++ b/weed/filer/filechunks_read.go
@@ -0,0 +1,119 @@
+package filer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "sort"
+)
+
+func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
+
+ var points []*Point
+ for _, chunk := range chunks {
+ points = append(points, &Point{
+ x: chunk.Offset,
+ ts: chunk.Mtime,
+ chunk: chunk,
+ isStart: true,
+ })
+ points = append(points, &Point{
+ x: chunk.Offset + int64(chunk.Size),
+ ts: chunk.Mtime,
+ chunk: chunk,
+ isStart: false,
+ })
+ }
+ sort.Slice(points, func(i, j int) bool {
+ if points[i].x != points[j].x {
+ return points[i].x < points[j].x
+ }
+ if points[i].ts != points[j].ts {
+ return points[i].ts < points[j].ts
+ }
+ if !points[i].isStart {
+ return true
+ }
+ return false
+ })
+
+ var prevX int64
+ var queue []*Point
+ for _, point := range points {
+ if point.isStart {
+ if len(queue) > 0 {
+ lastIndex := len(queue) -1
+ lastPoint := queue[lastIndex]
+ if point.x != prevX && lastPoint.ts < point.ts {
+ visibles = addToVisibles(visibles, prevX, lastPoint, point)
+ prevX = point.x
+ }
+ }
+ // insert into queue
+ for i := len(queue); i >= 0; i-- {
+ if i == 0 || queue[i-1].ts <= point.ts {
+ if i == len(queue) {
+ prevX = point.x
+ }
+ queue = addToQueue(queue, i, point)
+ break
+ }
+ }
+ } else {
+ lastIndex := len(queue) - 1
+ index := lastIndex
+ var startPoint *Point
+ for ; index >= 0; index-- {
+ startPoint = queue[index]
+ if startPoint.ts == point.ts {
+ queue = removeFromQueue(queue, index)
+ break
+ }
+ }
+ if index == lastIndex && startPoint != nil {
+ visibles = addToVisibles(visibles, prevX, startPoint, point)
+ prevX = point.x
+ }
+ }
+ }
+
+ return
+}
+
+func removeFromQueue(queue []*Point, index int) []*Point {
+ for i := index; i < len(queue)-1; i++ {
+ queue[i] = queue[i+1]
+ }
+ queue = queue[:len(queue)-1]
+ return queue
+}
+
+func addToQueue(queue []*Point, index int, point *Point) []*Point {
+ queue = append(queue, point)
+ for i := len(queue) - 1; i > index; i-- {
+ queue[i], queue[i-1] = queue[i-1], queue[i]
+ }
+ return queue
+}
+
+func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval {
+ if prevX < point.x {
+ chunk := startPoint.chunk
+ visibles = append(visibles, VisibleInterval{
+ start: prevX,
+ stop: point.x,
+ fileId: chunk.FileId,
+ modifiedTime: chunk.Mtime,
+ chunkOffset: prevX - chunk.Offset,
+ chunkSize: chunk.Size,
+ cipherKey: chunk.CipherKey,
+ isGzipped: chunk.IsCompressed,
+ })
+ }
+ return visibles
+}
+
+type Point struct {
+ x int64
+ ts int64
+ chunk *filer_pb.FileChunk
+ isStart bool
+}
diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go
new file mode 100644
index 000000000..1920f5185
--- /dev/null
+++ b/weed/filer/filechunks_read_test.go
@@ -0,0 +1,119 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "math/rand"
+ "testing"
+)
+
+func TestReadResolvedChunks(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {
+ FileId: "a",
+ Offset: 0,
+ Size: 100,
+ Mtime: 1,
+ },
+ {
+ FileId: "b",
+ Offset: 50,
+ Size: 100,
+ Mtime: 2,
+ },
+ {
+ FileId: "c",
+ Offset: 200,
+ Size: 50,
+ Mtime: 3,
+ },
+ {
+ FileId: "d",
+ Offset: 250,
+ Size: 50,
+ Mtime: 4,
+ },
+ {
+ FileId: "e",
+ Offset: 175,
+ Size: 100,
+ Mtime: 5,
+ },
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ for _, visible := range visibles {
+ fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime)
+ }
+
+}
+
+func TestRandomizedReadResolvedChunks(t *testing.T) {
+
+ var limit int64 = 1024*1024
+ array := make([]int64, limit)
+ var chunks []*filer_pb.FileChunk
+ for ts := int64(0); ts < 1024; ts++ {
+ x := rand.Int63n(limit)
+ y := rand.Int63n(limit)
+ size := x - y
+ if size < 0 {
+ size = -size
+ }
+ if size > 1024 {
+ size = 1024
+ }
+ start := x
+ if start > y {
+ start = y
+ }
+ chunks = append(chunks, randomWrite(array, start, size, ts))
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ for _, visible := range visibles {
+ for i := visible.start; i<visible.stop;i++{
+ if array[i] != visible.modifiedTime {
+ t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTime)
+ }
+ }
+ }
+
+ // fmt.Printf("visibles %d", len(visibles))
+
+}
+
+func randomWrite(array []int64, start int64, size int64, ts int64) *filer_pb.FileChunk {
+ for i := start; i < start+size; i++ {
+ array[i] = ts
+ }
+ // fmt.Printf("write [%d,%d) %d\n", start, start+size, ts)
+ return &filer_pb.FileChunk{
+ FileId: "",
+ Offset: start,
+ Size: uint64(size),
+ Mtime: ts,
+ }
+}
+
+func TestSequentialReadResolvedChunks(t *testing.T) {
+
+ var chunkSize int64 = 1024*1024*2
+ var chunks []*filer_pb.FileChunk
+ for ts := int64(0); ts < 13; ts++ {
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: "",
+ Offset: chunkSize*ts,
+ Size: uint64(chunkSize),
+ Mtime: 1,
+ })
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ fmt.Printf("visibles %d", len(visibles))
+
+}