aboutsummaryrefslogtreecommitdiff
path: root/other/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client/src/main')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ReadChunks.java109
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java90
2 files changed, 110 insertions, 89 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java
new file mode 100644
index 000000000..2eba4f808
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java
@@ -0,0 +1,109 @@
+package seaweedfs.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class ReadChunks {
+
+ public static List<SeaweedRead.VisibleInterval> readResolvedChunks(List<FilerProto.FileChunk> chunkList) throws IOException {
+ List<Point> points = new ArrayList<>(chunkList.size() * 2);
+ for (FilerProto.FileChunk chunk : chunkList) {
+ points.add(new Point(chunk.getOffset(), chunk, true));
+ points.add(new Point(chunk.getOffset() + chunk.getSize(), chunk, false));
+ }
+ Collections.sort(points, new Comparator<Point>() {
+ @Override
+ public int compare(Point a, Point b) {
+ int x = (int) (a.x - b.x);
+ if (a.x != b.x) {
+ return (int) (a.x - b.x);
+ }
+ if (a.ts != b.ts) {
+ return (int) (a.ts - b.ts);
+ }
+ if (!a.isStart) {
+ return -1;
+ }
+ return 1;
+ }
+ });
+
+ long prevX = 0;
+ List<SeaweedRead.VisibleInterval> visibles = new ArrayList<>();
+ ArrayList<Point> queue = new ArrayList<>();
+ for (Point point : points) {
+ if (point.isStart) {
+ if (queue.size() > 0) {
+ int lastIndex = queue.size() - 1;
+ Point lastPoint = queue.get(lastIndex);
+ if (point.x != prevX && lastPoint.ts < point.ts) {
+ addToVisibles(visibles, prevX, lastPoint, point);
+ prevX = point.x;
+ }
+ }
+ // insert into queue
+ for (int i = queue.size(); i >= 0; i--) {
+ if (i == 0 || queue.get(i - 1).ts <= point.ts) {
+ if (i == queue.size()) {
+ prevX = point.x;
+ }
+ queue.add(i, point);
+ break;
+ }
+ }
+ } else {
+ int lastIndex = queue.size() - 1;
+ int index = lastIndex;
+ Point startPoint = null;
+ for (; index >= 0; index--) {
+ startPoint = queue.get(index);
+ if (startPoint.ts == point.ts) {
+ queue.remove(index);
+ break;
+ }
+ }
+ if (index == lastIndex && startPoint != null) {
+ addToVisibles(visibles, prevX, startPoint, point);
+ prevX = point.x;
+ }
+ }
+ }
+
+ return visibles;
+
+ }
+
+ private static void addToVisibles(List<SeaweedRead.VisibleInterval> visibles, long prevX, Point startPoint, Point point) {
+ if (prevX < point.x) {
+ FilerProto.FileChunk chunk = startPoint.chunk;
+ visibles.add(new SeaweedRead.VisibleInterval(
+ prevX,
+ point.x,
+ chunk.getFileId(),
+ chunk.getMtime(),
+ prevX - chunk.getOffset(),
+ chunk.getOffset() == prevX && chunk.getSize() == prevX - startPoint.x,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed()
+ ));
+ }
+ }
+
+ static class Point {
+ long x;
+ long ts;
+ FilerProto.FileChunk chunk;
+ boolean isStart;
+
+ public Point(long x, FilerProto.FileChunk chunk, boolean isStart) {
+ this.x = x;
+ this.ts = chunk.getMtime();
+ this.chunk = chunk;
+ this.isStart = isStart;
+ }
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 011462a17..41033befb 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -226,96 +226,8 @@ public class SeaweedRead {
chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList);
- FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
- Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
- @Override
- public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- // if just a.getMtime() - b.getMtime(), it will overflow!
- if (a.getMtime() < b.getMtime()) {
- return -1;
- } else if (a.getMtime() > b.getMtime()) {
- return 1;
- }
- return 0;
- }
- });
-
- List<VisibleInterval> visibles = new ArrayList<>();
- for (FilerProto.FileChunk chunk : chunks) {
- List<VisibleInterval> newVisibles = new ArrayList<>();
- visibles = mergeIntoVisibles(visibles, newVisibles, chunk);
- }
-
- return visibles;
- }
-
- private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles,
- List<VisibleInterval> newVisibles,
- FilerProto.FileChunk chunk) {
- VisibleInterval newV = new VisibleInterval(
- chunk.getOffset(),
- chunk.getOffset() + chunk.getSize(),
- chunk.getFileId(),
- chunk.getMtime(),
- 0,
- true,
- chunk.getCipherKey().toByteArray(),
- chunk.getIsCompressed()
- );
-
- // easy cases to speed up
- if (visibles.size() == 0) {
- visibles.add(newV);
- return visibles;
- }
- if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) {
- visibles.add(newV);
- return visibles;
- }
-
- for (VisibleInterval v : visibles) {
- if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
- newVisibles.add(new VisibleInterval(
- v.start,
- chunk.getOffset(),
- v.fileId,
- v.modifiedTime,
- v.chunkOffset,
- false,
- v.cipherKey,
- v.isCompressed
- ));
- }
- long chunkStop = chunk.getOffset() + chunk.getSize();
- if (v.start < chunkStop && chunkStop < v.stop) {
- newVisibles.add(new VisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- v.chunkOffset + (chunkStop - v.start),
- false,
- v.cipherKey,
- v.isCompressed
- ));
- }
- if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
- newVisibles.add(v);
- }
- }
- newVisibles.add(newV);
-
- // keep everything sorted
- for (int i = newVisibles.size() - 1; i >= 0; i--) {
- if (i > 0 && newV.start < newVisibles.get(i - 1).start) {
- newVisibles.set(i, newVisibles.get(i - 1));
- } else {
- newVisibles.set(i, newV);
- break;
- }
- }
+ return ReadChunks.readResolvedChunks(chunkList);
- return newVisibles;
}
public static String parseVolumeId(String fileId) {