aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-03 22:12:20 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-03 22:12:20 -0800
commitc85ee7c0fdb61fe401a8cab2512c65b4e48a8126 (patch)
tree8cc64a9d7fb799e325567ce57c76d6f77e8702f6
parent4119c61df84305886e6116705f1ad08d3da69328 (diff)
downloadseaweedfs-c85ee7c0fdb61fe401a8cab2512c65b4e48a8126.tar.xz
seaweedfs-c85ee7c0fdb61fe401a8cab2512c65b4e48a8126.zip
HCFS: read concatenated files
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java3
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java3
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java73
-rw-r--r--other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java66
4 files changed, 116 insertions, 29 deletions
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index dd68e53f1..a399fba13 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -225,10 +225,13 @@ public class SeaweedFileSystemStore {
long writePosition = 0;
if (!overwrite) {
FilerProto.Entry existingEntry = lookupEntry(path);
+ LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry);
if (existingEntry != null) {
+ entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now);
}
+ LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication();
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index b31cae166..c0b296fb9 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -55,6 +55,9 @@ public class SeaweedInputStream extends FSInputStream {
this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
}
public String getPath() {
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
index edc279adc..08aea5745 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
@@ -41,6 +41,7 @@ public class SeaweedRead {
//TODO parallel this
long readCount = 0;
+ int startOffset = bufferOffset;
for (ChunkView chunkView : chunkViews) {
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
if (locations.getLocationsCount() == 0) {
@@ -59,10 +60,11 @@ public class SeaweedRead {
HttpEntity entity = response.getEntity();
int len = (int) (chunkView.logicOffset - position + chunkView.size);
- entity.getContent().read(buffer, bufferOffset, len);
+ int chunReadCount = entity.getContent().read(buffer, startOffset, len);
- LOG.debug("* read chunkView:{} length:{} position:{} bufferLength:{}", chunkView, len, position, bufferLength);
+ LOG.debug("* read chunkView:{} startOffset:{} length:{} chunReadCount:{}", chunkView, startOffset, len, chunReadCount);
readCount += len;
+ startOffset += len;
} catch (IOException e) {
e.printStackTrace();
@@ -72,17 +74,20 @@ public class SeaweedRead {
return readCount;
}
- private static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
+ public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
List<ChunkView> views = new ArrayList<>();
long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) {
- views.add(new ChunkView(
- chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset
- ));
+ if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
+ views.add(new ChunkView(
+ chunk.fileId,
+ offset - chunk.start,
+ Math.min(chunk.stop, stop) - offset,
+ offset
+ ));
+ offset = Math.min(chunk.stop, stop);
+ }
}
return views;
}
@@ -96,20 +101,10 @@ public class SeaweedRead {
}
});
- List<VisibleInterval> newVisibles = new ArrayList<>();
List<VisibleInterval> visibles = new ArrayList<>();
for (FilerProto.FileChunk chunk : chunks) {
- List<VisibleInterval> t = newVisibles;
- newVisibles = mergeIntoVisibles(visibles, newVisibles, chunk);
- if (t != newVisibles) {
- // visibles are changed in place
- } else {
- // newVisibles are modified
- visibles.clear();
- t = visibles;
- visibles = newVisibles;
- newVisibles = t;
- }
+ List<VisibleInterval> newVisibles = new ArrayList<>();
+ visibles = mergeIntoVisibles(visibles, newVisibles, chunk);
}
return visibles;
@@ -192,10 +187,10 @@ public class SeaweedRead {
}
public static class VisibleInterval {
- long start;
- long stop;
- long modifiedTime;
- String fileId;
+ public final long start;
+ public final long stop;
+ public final long modifiedTime;
+ public final String fileId;
public VisibleInterval(long start, long stop, String fileId, long modifiedTime) {
this.start = start;
@@ -203,13 +198,23 @@ public class SeaweedRead {
this.modifiedTime = modifiedTime;
this.fileId = fileId;
}
+
+ @Override
+ public String toString() {
+ return "VisibleIntervalq{" +
+ "start=" + start +
+ ", stop=" + stop +
+ ", modifiedTime=" + modifiedTime +
+ ", fileId='" + fileId + '\'' +
+ '}';
+ }
}
public static class ChunkView {
- String fileId;
- long offset;
- long size;
- long logicOffset;
+ public final String fileId;
+ public final long offset;
+ public final long size;
+ public final long logicOffset;
public ChunkView(String fileId, long offset, long size, long logicOffset) {
this.fileId = fileId;
@@ -217,6 +222,16 @@ public class SeaweedRead {
this.size = size;
this.logicOffset = logicOffset;
}
+
+ @Override
+ public String toString() {
+ return "ChunkView{" +
+ "fileId='" + fileId + '\'' +
+ ", offset=" + offset +
+ ", size=" + size +
+ ", logicOffset=" + logicOffset +
+ '}';
+ }
}
}
diff --git a/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java b/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java
new file mode 100644
index 000000000..4bb9efff5
--- /dev/null
+++ b/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java
@@ -0,0 +1,66 @@
+package seaweedfs.hdfs;
+
+import org.junit.Test;
+import seaweed.hdfs.SeaweedRead;
+import seaweedfs.client.FilerProto;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class SeaweedReadTest {
+
+ @Test
+ public void testNonOverlappingVisibleIntervals() {
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("aaa")
+ .setOffset(0)
+ .setSize(100)
+ .setMtime(1000)
+ .build());
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("bbb")
+ .setOffset(100)
+ .setSize(133)
+ .setMtime(2000)
+ .build());
+
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
+ System.out.println("visible:" + visibleInterval);
+ }
+
+ assertEquals(visibleIntervals.size(), 2);
+
+ SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0);
+ assertEquals(visibleInterval.start, 0);
+ assertEquals(visibleInterval.stop, 100);
+ assertEquals(visibleInterval.modifiedTime, 1000);
+ assertEquals(visibleInterval.fileId, "aaa");
+
+ visibleInterval = visibleIntervals.get(1);
+ assertEquals(visibleInterval.start, 100);
+ assertEquals(visibleInterval.stop, 233);
+ assertEquals(visibleInterval.modifiedTime, 2000);
+ assertEquals(visibleInterval.fileId, "bbb");
+
+ List<SeaweedRead.ChunkView> chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233);
+
+ SeaweedRead.ChunkView chunkView = chunkViews.get(0);
+ assertEquals(chunkView.offset, 0);
+ assertEquals(chunkView.size, 100);
+ assertEquals(chunkView.logicOffset, 0);
+ assertEquals(chunkView.fileId, "aaa");
+
+ chunkView = chunkViews.get(1);
+ assertEquals(chunkView.offset, 0);
+ assertEquals(chunkView.size, 133);
+ assertEquals(chunkView.logicOffset, 100);
+ assertEquals(chunkView.fileId, "bbb");
+
+
+ }
+
+}