aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-27 23:40:23 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-27 23:40:23 -0800
commit9123d799b86a26345d33187463f6d6ed78564fdb (patch)
tree5d5699337a470be024bfd60f66166792b1123d4b /other/java/client
parent319ab6d98ff6f2d17ebda87fc4cfe65bf02667ef (diff)
downloadseaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.tar.xz
seaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.zip
refactor
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/pom.xml11
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferOutputStream.java21
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java260
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java86
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java63
5 files changed, 441 insertions, 0 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index db2634b2b..1ea39863f 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -50,6 +50,17 @@
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<distributionManagement>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferOutputStream.java
new file mode 100644
index 000000000..efcc265de
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferOutputStream.java
@@ -0,0 +1,21 @@
+package seaweedfs.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferOutputStream extends OutputStream {
+ private final ByteBuffer buf;
+
+ public ByteBufferOutputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ public void write(int b) throws IOException {
+ this.buf.put((byte)b);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ this.buf.put(b, off, len);
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
new file mode 100644
index 000000000..a906a689b
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -0,0 +1,260 @@
+package seaweedfs.client;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class SeaweedRead {
+
+ // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+
+ // returns bytesRead
+ public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
+ final long position, final byte[] buffer, final int bufferOffset,
+ final int bufferLength) {
+
+ List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
+
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ for (ChunkView chunkView : chunkViews) {
+ String vid = parseVolumeId(chunkView.fileId);
+ lookupRequest.addVolumeIds(vid);
+ }
+
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+
+ Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+
+ //TODO parallel this
+ long readCount = 0;
+ int startOffset = bufferOffset;
+ for (ChunkView chunkView : chunkViews) {
+ FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
+ if (locations.getLocationsCount() == 0) {
+ // log here!
+ return 0;
+ }
+
+ int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+
+ readCount += len;
+ startOffset += len;
+
+ }
+
+ return readCount;
+ }
+
+ private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) {
+ HttpClient client = HttpClientBuilder.create().build();
+ HttpGet request = new HttpGet(
+ String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+
+ if (!chunkView.isFullChunk) {
+ request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
+ request.setHeader(HttpHeaders.RANGE,
+ String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size));
+ }
+
+ try {
+ HttpResponse response = client.execute(request);
+ HttpEntity entity = response.getEntity();
+
+ int len = (int) (chunkView.logicOffset - position + chunkView.size);
+ OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len));
+ entity.writeTo(outputStream);
+ // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len);
+
+ return len;
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+ public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
+ List<ChunkView> views = new ArrayList<>();
+
+ long stop = offset + size;
+ for (VisibleInterval chunk : visibleIntervals) {
+ if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
+ boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
+ views.add(new ChunkView(
+ chunk.fileId,
+ offset - chunk.start,
+ Math.min(chunk.stop, stop) - offset,
+ offset,
+ isFullChunk
+ ));
+ offset = Math.min(chunk.stop, stop);
+ }
+ }
+ return views;
+ }
+
+ public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
+ FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
+ Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
+ @Override
+ public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
+ return (int) (a.getMtime() - b.getMtime());
+ }
+ });
+
+ List<VisibleInterval> visibles = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : chunks) {
+ List<VisibleInterval> newVisibles = new ArrayList<>();
+ visibles = mergeIntoVisibles(visibles, newVisibles, chunk);
+ }
+
+ return visibles;
+ }
+
+ private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles,
+ List<VisibleInterval> newVisibles,
+ FilerProto.FileChunk chunk) {
+ VisibleInterval newV = new VisibleInterval(
+ chunk.getOffset(),
+ chunk.getOffset() + chunk.getSize(),
+ chunk.getFileId(),
+ chunk.getMtime(),
+ true
+ );
+
+ // easy cases to speed up
+ if (visibles.size() == 0) {
+ visibles.add(newV);
+ return visibles;
+ }
+ if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) {
+ visibles.add(newV);
+ return visibles;
+ }
+
+ for (VisibleInterval v : visibles) {
+ if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
+ newVisibles.add(new VisibleInterval(
+ v.start,
+ chunk.getOffset(),
+ v.fileId,
+ v.modifiedTime,
+ false
+ ));
+ }
+ long chunkStop = chunk.getOffset() + chunk.getSize();
+ if (v.start < chunkStop && chunkStop < v.stop) {
+ newVisibles.add(new VisibleInterval(
+ chunkStop,
+ v.stop,
+ v.fileId,
+ v.modifiedTime,
+ false
+ ));
+ }
+ if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
+ newVisibles.add(v);
+ }
+ }
+ newVisibles.add(newV);
+
+ // keep everything sorted
+ for (int i = newVisibles.size() - 1; i >= 0; i--) {
+ if (i > 0 && newV.start < newVisibles.get(i - 1).start) {
+ newVisibles.set(i, newVisibles.get(i - 1));
+ } else {
+ newVisibles.set(i, newV);
+ break;
+ }
+ }
+
+ return newVisibles;
+ }
+
+ public static String parseVolumeId(String fileId) {
+ int commaIndex = fileId.lastIndexOf(',');
+ if (commaIndex > 0) {
+ return fileId.substring(0, commaIndex);
+ }
+ return fileId;
+ }
+
+ public static long totalSize(List<FilerProto.FileChunk> chunksList) {
+ long size = 0;
+ for (FilerProto.FileChunk chunk : chunksList) {
+ long t = chunk.getOffset() + chunk.getSize();
+ if (size < t) {
+ size = t;
+ }
+ }
+ return size;
+ }
+
+ public static class VisibleInterval {
+ public final long start;
+ public final long stop;
+ public final long modifiedTime;
+ public final String fileId;
+ public final boolean isFullChunk;
+
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) {
+ this.start = start;
+ this.stop = stop;
+ this.modifiedTime = modifiedTime;
+ this.fileId = fileId;
+ this.isFullChunk = isFullChunk;
+ }
+
+ @Override
+ public String toString() {
+ return "VisibleInterval{" +
+ "start=" + start +
+ ", stop=" + stop +
+ ", modifiedTime=" + modifiedTime +
+ ", fileId='" + fileId + '\'' +
+ ", isFullChunk=" + isFullChunk +
+ '}';
+ }
+ }
+
+ public static class ChunkView {
+ public final String fileId;
+ public final long offset;
+ public final long size;
+ public final long logicOffset;
+ public final boolean isFullChunk;
+
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) {
+ this.fileId = fileId;
+ this.offset = offset;
+ this.size = size;
+ this.logicOffset = logicOffset;
+ this.isFullChunk = isFullChunk;
+ }
+
+ @Override
+ public String toString() {
+ return "ChunkView{" +
+ "fileId='" + fileId + '\'' +
+ ", offset=" + offset +
+ ", size=" + size +
+ ", logicOffset=" + logicOffset +
+ ", isFullChunk=" + isFullChunk +
+ '}';
+ }
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
new file mode 100644
index 000000000..a7cede09f
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -0,0 +1,86 @@
+package seaweedfs.client;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.mime.HttpMultipartMode;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SeaweedWrite {
+
+ public static void writeData(FilerProto.Entry.Builder entry,
+ final String replication,
+ final FilerGrpcClient filerGrpcClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength) throws IOException {
+ FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
+ FilerProto.AssignVolumeRequest.newBuilder()
+ .setCollection("")
+ .setReplication(replication)
+ .setDataCenter("")
+ .setReplication("")
+ .setTtlSec(0)
+ .build());
+ String fileId = response.getFileId();
+ String url = response.getUrl();
+ String targetUrl = String.format("http://%s/%s", url, fileId);
+
+ String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength);
+
+ entry.addChunks(FilerProto.FileChunk.newBuilder()
+ .setFileId(fileId)
+ .setOffset(offset)
+ .setSize(bytesLength)
+ .setMtime(System.currentTimeMillis() / 10000L)
+ .setETag(etag)
+ );
+
+ }
+
+ public static void writeMeta(final FilerGrpcClient filerGrpcClient,
+ final String parentDirectory, final FilerProto.Entry.Builder entry) {
+ filerGrpcClient.getBlockingStub().createEntry(
+ FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parentDirectory)
+ .setEntry(entry)
+ .build()
+ );
+ }
+
+ private static String multipartUpload(String targetUrl,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength) throws IOException {
+
+ CloseableHttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build();
+
+ InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+
+ HttpPost post = new HttpPost(targetUrl);
+
+ post.setEntity(MultipartEntityBuilder.create()
+ .setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+ .addBinaryBody("upload", inputStream)
+ .build());
+
+ try {
+ HttpResponse response = client.execute(post);
+
+ String etag = response.getLastHeader("ETag").getValue();
+
+ if (etag != null && etag.startsWith("\"") && etag.endsWith("\"")) {
+ etag = etag.substring(1, etag.length() - 1);
+ }
+
+ return etag;
+ } finally {
+ client.close();
+ }
+
+ }
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
new file mode 100644
index 000000000..ccfcdb117
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -0,0 +1,63 @@
+package seaweedfs.client;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeaweedReadTest {
+
+ @Test
+ public void testNonOverlappingVisibleIntervals() {
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("aaa")
+ .setOffset(0)
+ .setSize(100)
+ .setMtime(1000)
+ .build());
+ chunks.add(FilerProto.FileChunk.newBuilder()
+ .setFileId("bbb")
+ .setOffset(100)
+ .setSize(133)
+ .setMtime(2000)
+ .build());
+
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
+ System.out.println("visible:" + visibleInterval);
+ }
+
+ Assert.assertEquals(visibleIntervals.size(), 2);
+
+ SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0);
+ Assert.assertEquals(visibleInterval.start, 0);
+ Assert.assertEquals(visibleInterval.stop, 100);
+ Assert.assertEquals(visibleInterval.modifiedTime, 1000);
+ Assert.assertEquals(visibleInterval.fileId, "aaa");
+
+ visibleInterval = visibleIntervals.get(1);
+ Assert.assertEquals(visibleInterval.start, 100);
+ Assert.assertEquals(visibleInterval.stop, 233);
+ Assert.assertEquals(visibleInterval.modifiedTime, 2000);
+ Assert.assertEquals(visibleInterval.fileId, "bbb");
+
+ List<SeaweedRead.ChunkView> chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233);
+
+ SeaweedRead.ChunkView chunkView = chunkViews.get(0);
+ Assert.assertEquals(chunkView.offset, 0);
+ Assert.assertEquals(chunkView.size, 100);
+ Assert.assertEquals(chunkView.logicOffset, 0);
+ Assert.assertEquals(chunkView.fileId, "aaa");
+
+ chunkView = chunkViews.get(1);
+ Assert.assertEquals(chunkView.offset, 0);
+ Assert.assertEquals(chunkView.size, 133);
+ Assert.assertEquals(chunkView.logicOffset, 100);
+ Assert.assertEquals(chunkView.fileId, "bbb");
+
+
+ }
+
+}