aboutsummaryrefslogtreecommitdiff
path: root/other/java/hdfs
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-27 23:40:23 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-27 23:40:23 -0800
commit9123d799b86a26345d33187463f6d6ed78564fdb (patch)
tree5d5699337a470be024bfd60f66166792b1123d4b /other/java/hdfs
parent319ab6d98ff6f2d17ebda87fc4cfe65bf02667ef (diff)
downloadseaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.tar.xz
seaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.zip
refactor
Diffstat (limited to 'other/java/hdfs')
-rw-r--r--other/java/hdfs/pom.xml11
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java21
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java1
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java1
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java1
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java262
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java88
-rw-r--r--other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java65
8 files changed, 3 insertions, 447 deletions
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
index d8d6f8b8c..a0cab8752 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs/pom.xml
@@ -154,17 +154,6 @@
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpmime</artifactId>
- <version>4.5.6</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java
deleted file mode 100644
index e9ea81f36..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package seaweed.hdfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-public class ByteBufferOutputStream extends OutputStream {
- private final ByteBuffer buf;
-
- public ByteBufferOutputStream(ByteBuffer buf) {
- this.buf = buf;
- }
-
- public void write(int b) throws IOException {
- this.buf.put((byte)b);
- }
-
- public void write(byte[] b, int off, int len) throws IOException {
- this.buf.put(b, off, len);
- }
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index ffc109b20..27678e615 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedRead;
import java.io.FileNotFoundException;
import java.io.IOException;
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index c0b296fb9..90c14c772 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -10,6 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 4dee4bf09..4f307ff96 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -11,6 +11,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
deleted file mode 100644
index a4a2e9743..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
+++ /dev/null
@@ -1,262 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClientBuilder;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class SeaweedRead {
-
- // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
-
- // returns bytesRead
- public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
- final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength) {
-
- List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
-
- FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
- for (ChunkView chunkView : chunkViews) {
- String vid = parseVolumeId(chunkView.fileId);
- lookupRequest.addVolumeIds(vid);
- }
-
- FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
- .getBlockingStub().lookupVolume(lookupRequest.build());
-
- Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
-
- //TODO parallel this
- long readCount = 0;
- int startOffset = bufferOffset;
- for (ChunkView chunkView : chunkViews) {
- FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
- if (locations.getLocationsCount() == 0) {
- // log here!
- return 0;
- }
-
- int len = readChunkView(position, buffer, startOffset, chunkView, locations);
-
- readCount += len;
- startOffset += len;
-
- }
-
- return readCount;
- }
-
- private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) {
- HttpClient client = HttpClientBuilder.create().build();
- HttpGet request = new HttpGet(
- String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
-
- if (!chunkView.isFullChunk) {
- request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
- request.setHeader(HttpHeaders.RANGE,
- String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size));
- }
-
- try {
- HttpResponse response = client.execute(request);
- HttpEntity entity = response.getEntity();
-
- int len = (int) (chunkView.logicOffset - position + chunkView.size);
- OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len));
- entity.writeTo(outputStream);
- // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len);
-
- return len;
-
- } catch (IOException e) {
- e.printStackTrace();
- }
- return 0;
- }
-
- public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
- List<ChunkView> views = new ArrayList<>();
-
- long stop = offset + size;
- for (VisibleInterval chunk : visibleIntervals) {
- if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
- boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
- views.add(new ChunkView(
- chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
- isFullChunk
- ));
- offset = Math.min(chunk.stop, stop);
- }
- }
- return views;
- }
-
- public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
- FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
- Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
- @Override
- public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- return (int) (a.getMtime() - b.getMtime());
- }
- });
-
- List<VisibleInterval> visibles = new ArrayList<>();
- for (FilerProto.FileChunk chunk : chunks) {
- List<VisibleInterval> newVisibles = new ArrayList<>();
- visibles = mergeIntoVisibles(visibles, newVisibles, chunk);
- }
-
- return visibles;
- }
-
- private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles,
- List<VisibleInterval> newVisibles,
- FilerProto.FileChunk chunk) {
- VisibleInterval newV = new VisibleInterval(
- chunk.getOffset(),
- chunk.getOffset() + chunk.getSize(),
- chunk.getFileId(),
- chunk.getMtime(),
- true
- );
-
- // easy cases to speed up
- if (visibles.size() == 0) {
- visibles.add(newV);
- return visibles;
- }
- if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) {
- visibles.add(newV);
- return visibles;
- }
-
- for (VisibleInterval v : visibles) {
- if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
- newVisibles.add(new VisibleInterval(
- v.start,
- chunk.getOffset(),
- v.fileId,
- v.modifiedTime,
- false
- ));
- }
- long chunkStop = chunk.getOffset() + chunk.getSize();
- if (v.start < chunkStop && chunkStop < v.stop) {
- newVisibles.add(new VisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false
- ));
- }
- if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
- newVisibles.add(v);
- }
- }
- newVisibles.add(newV);
-
- // keep everything sorted
- for (int i = newVisibles.size() - 1; i >= 0; i--) {
- if (i > 0 && newV.start < newVisibles.get(i - 1).start) {
- newVisibles.set(i, newVisibles.get(i - 1));
- } else {
- newVisibles.set(i, newV);
- break;
- }
- }
-
- return newVisibles;
- }
-
- public static String parseVolumeId(String fileId) {
- int commaIndex = fileId.lastIndexOf(',');
- if (commaIndex > 0) {
- return fileId.substring(0, commaIndex);
- }
- return fileId;
- }
-
- public static long totalSize(List<FilerProto.FileChunk> chunksList) {
- long size = 0;
- for (FilerProto.FileChunk chunk : chunksList) {
- long t = chunk.getOffset() + chunk.getSize();
- if (size < t) {
- size = t;
- }
- }
- return size;
- }
-
- public static class VisibleInterval {
- public final long start;
- public final long stop;
- public final long modifiedTime;
- public final String fileId;
- public final boolean isFullChunk;
-
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) {
- this.start = start;
- this.stop = stop;
- this.modifiedTime = modifiedTime;
- this.fileId = fileId;
- this.isFullChunk = isFullChunk;
- }
-
- @Override
- public String toString() {
- return "VisibleInterval{" +
- "start=" + start +
- ", stop=" + stop +
- ", modifiedTime=" + modifiedTime +
- ", fileId='" + fileId + '\'' +
- ", isFullChunk=" + isFullChunk +
- '}';
- }
- }
-
- public static class ChunkView {
- public final String fileId;
- public final long offset;
- public final long size;
- public final long logicOffset;
- public final boolean isFullChunk;
-
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) {
- this.fileId = fileId;
- this.offset = offset;
- this.size = size;
- this.logicOffset = logicOffset;
- this.isFullChunk = isFullChunk;
- }
-
- @Override
- public String toString() {
- return "ChunkView{" +
- "fileId='" + fileId + '\'' +
- ", offset=" + offset +
- ", size=" + size +
- ", logicOffset=" + logicOffset +
- ", isFullChunk=" + isFullChunk +
- '}';
- }
- }
-
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java
deleted file mode 100644
index ee2131007..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.mime.HttpMultipartMode;
-import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class SeaweedWrite {
-
- public static void writeData(FilerProto.Entry.Builder entry,
- final String replication,
- final FilerGrpcClient filerGrpcClient,
- final long offset,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
- FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
- FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection("")
- .setReplication(replication)
- .setDataCenter("")
- .setReplication("")
- .setTtlSec(0)
- .build());
- String fileId = response.getFileId();
- String url = response.getUrl();
- String targetUrl = String.format("http://%s/%s", url, fileId);
-
- String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength);
-
- entry.addChunks(FilerProto.FileChunk.newBuilder()
- .setFileId(fileId)
- .setOffset(offset)
- .setSize(bytesLength)
- .setMtime(System.currentTimeMillis() / 10000L)
- .setETag(etag)
- );
-
- }
-
- public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String parentDirectory, final FilerProto.Entry.Builder entry) {
- filerGrpcClient.getBlockingStub().createEntry(
- FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parentDirectory)
- .setEntry(entry)
- .build()
- );
- }
-
- private static String multipartUpload(String targetUrl,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
-
- CloseableHttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build();
-
- InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
-
- HttpPost post = new HttpPost(targetUrl);
-
- post.setEntity(MultipartEntityBuilder.create()
- .setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
- .addBinaryBody("upload", inputStream)
- .build());
-
- try {
- HttpResponse response = client.execute(post);
-
- String etag = response.getLastHeader("ETag").getValue();
-
- if (etag != null && etag.startsWith("\"") && etag.endsWith("\"")) {
- etag = etag.substring(1, etag.length() - 1);
- }
-
- return etag;
- } finally {
- client.close();
- }
-
- }
-}
diff --git a/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java b/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java
deleted file mode 100644
index e3ab97910..000000000
--- a/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package seaweed.hdfs;
-
-import org.junit.Test;
-import seaweedfs.client.FilerProto;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class SeaweedReadTest {
-
- @Test
- public void testNonOverlappingVisibleIntervals() {
- List<FilerProto.FileChunk> chunks = new ArrayList<>();
- chunks.add(FilerProto.FileChunk.newBuilder()
- .setFileId("aaa")
- .setOffset(0)
- .setSize(100)
- .setMtime(1000)
- .build());
- chunks.add(FilerProto.FileChunk.newBuilder()
- .setFileId("bbb")
- .setOffset(100)
- .setSize(133)
- .setMtime(2000)
- .build());
-
- List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
- for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
- System.out.println("visible:" + visibleInterval);
- }
-
- assertEquals(visibleIntervals.size(), 2);
-
- SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0);
- assertEquals(visibleInterval.start, 0);
- assertEquals(visibleInterval.stop, 100);
- assertEquals(visibleInterval.modifiedTime, 1000);
- assertEquals(visibleInterval.fileId, "aaa");
-
- visibleInterval = visibleIntervals.get(1);
- assertEquals(visibleInterval.start, 100);
- assertEquals(visibleInterval.stop, 233);
- assertEquals(visibleInterval.modifiedTime, 2000);
- assertEquals(visibleInterval.fileId, "bbb");
-
- List<SeaweedRead.ChunkView> chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233);
-
- SeaweedRead.ChunkView chunkView = chunkViews.get(0);
- assertEquals(chunkView.offset, 0);
- assertEquals(chunkView.size, 100);
- assertEquals(chunkView.logicOffset, 0);
- assertEquals(chunkView.fileId, "aaa");
-
- chunkView = chunkViews.get(1);
- assertEquals(chunkView.offset, 0);
- assertEquals(chunkView.size, 133);
- assertEquals(chunkView.logicOffset, 100);
- assertEquals(chunkView.fileId, "bbb");
-
-
- }
-
-}