diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-12-27 23:40:23 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-12-27 23:40:23 -0800 |
| commit | 9123d799b86a26345d33187463f6d6ed78564fdb (patch) | |
| tree | 5d5699337a470be024bfd60f66166792b1123d4b /other/java/client | |
| parent | 319ab6d98ff6f2d17ebda87fc4cfe65bf02667ef (diff) | |
| download | seaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.tar.xz seaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.zip | |
refactor
Diffstat (limited to 'other/java/client')
5 files changed, 441 insertions, 0 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index db2634b2b..1ea39863f 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -50,6 +50,17 @@ <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>4.5.6</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> </dependencies> <distributionManagement> diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferOutputStream.java new file mode 100644 index 000000000..efcc265de --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferOutputStream.java @@ -0,0 +1,21 @@ +package seaweedfs.client; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ByteBufferOutputStream extends OutputStream { + private final ByteBuffer buf; + + public ByteBufferOutputStream(ByteBuffer buf) { + this.buf = buf; + } + + public void write(int b) throws IOException { + this.buf.put((byte)b); + } + + public void write(byte[] b, int off, int len) throws IOException { + this.buf.put(b, off, len); + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java new file mode 100644 index 000000000..a906a689b --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -0,0 +1,260 @@ +package seaweedfs.client; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public class SeaweedRead { + + // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + + // returns bytesRead + public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, + final long position, final byte[] buffer, final int bufferOffset, + final int bufferLength) { + + List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); + + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); + for (ChunkView chunkView : chunkViews) { + String vid = parseVolumeId(chunkView.fileId); + lookupRequest.addVolumeIds(vid); + } + + FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + + Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); + + //TODO parallel this + long readCount = 0; + int startOffset = bufferOffset; + for (ChunkView chunkView : chunkViews) { + FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); + if (locations.getLocationsCount() == 0) { + // log here! + return 0; + } + + int len = readChunkView(position, buffer, startOffset, chunkView, locations); + + readCount += len; + startOffset += len; + + } + + return readCount; + } + + private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) { + HttpClient client = HttpClientBuilder.create().build(); + HttpGet request = new HttpGet( + String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + + if (!chunkView.isFullChunk) { + request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + request.setHeader(HttpHeaders.RANGE, + String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size)); + } + + try { + HttpResponse response = client.execute(request); + HttpEntity entity = response.getEntity(); + + int len = (int) (chunkView.logicOffset - position + chunkView.size); + OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); + entity.writeTo(outputStream); + // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); + + return len; + + } catch (IOException e) { + e.printStackTrace(); + } + return 0; + } + + public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) { + List<ChunkView> views = new ArrayList<>(); + + long stop = offset + size; + for (VisibleInterval chunk : visibleIntervals) { + if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; + views.add(new ChunkView( + chunk.fileId, + offset - chunk.start, + Math.min(chunk.stop, stop) - offset, + offset, + isFullChunk + )); + offset = Math.min(chunk.stop, stop); + } + } + return views; + } + + public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) { + FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); + Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { + @Override + public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { + return (int) (a.getMtime() - b.getMtime()); + } + }); + + List<VisibleInterval> visibles = new ArrayList<>(); + for (FilerProto.FileChunk chunk : chunks) { + List<VisibleInterval> newVisibles = new ArrayList<>(); + visibles = mergeIntoVisibles(visibles, newVisibles, chunk); + } + + return visibles; + } + + private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles, + List<VisibleInterval> newVisibles, + FilerProto.FileChunk chunk) { + VisibleInterval newV = new VisibleInterval( + chunk.getOffset(), + chunk.getOffset() + chunk.getSize(), + chunk.getFileId(), + chunk.getMtime(), + true + ); + + // easy cases to speed up + if (visibles.size() == 0) { + visibles.add(newV); + return visibles; + } + if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) { + visibles.add(newV); + return visibles; + } + + for (VisibleInterval v : visibles) { + if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { + newVisibles.add(new VisibleInterval( + v.start, + chunk.getOffset(), + v.fileId, + v.modifiedTime, + false + )); + } + long chunkStop = chunk.getOffset() + chunk.getSize(); + if (v.start < chunkStop && chunkStop < v.stop) { + newVisibles.add(new VisibleInterval( + chunkStop, + v.stop, + v.fileId, + v.modifiedTime, + false + )); + } + if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { + newVisibles.add(v); + } + } + newVisibles.add(newV); + + // keep everything sorted + for (int i = newVisibles.size() - 1; i >= 0; i--) { + if (i > 0 && newV.start < newVisibles.get(i - 1).start) { + newVisibles.set(i, newVisibles.get(i - 1)); + } else { + newVisibles.set(i, newV); + break; + } + } + + return newVisibles; + } + + public static String parseVolumeId(String fileId) { + int commaIndex = fileId.lastIndexOf(','); + if (commaIndex > 0) { + return fileId.substring(0, commaIndex); + } + return fileId; + } + + public static long totalSize(List<FilerProto.FileChunk> chunksList) { + long size = 0; + for (FilerProto.FileChunk chunk : chunksList) { + long t = chunk.getOffset() + chunk.getSize(); + if (size < t) { + size = t; + } + } + return size; + } + + public static class VisibleInterval { + public final long start; + public final long stop; + public final long modifiedTime; + public final String fileId; + public final boolean isFullChunk; + + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { + this.start = start; + this.stop = stop; + this.modifiedTime = modifiedTime; + this.fileId = fileId; + this.isFullChunk = isFullChunk; + } + + @Override + public String toString() { + return "VisibleInterval{" + + "start=" + start + + ", stop=" + stop + + ", modifiedTime=" + modifiedTime + + ", fileId='" + fileId + '\'' + + ", isFullChunk=" + isFullChunk + + '}'; + } + } + + public static class ChunkView { + public final String fileId; + public final long offset; + public final long size; + public final long logicOffset; + public final boolean isFullChunk; + + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { + this.fileId = fileId; + this.offset = offset; + this.size = size; + this.logicOffset = logicOffset; + this.isFullChunk = isFullChunk; + } + + @Override + public String toString() { + return "ChunkView{" + + "fileId='" + fileId + '\'' + + ", offset=" + offset + + ", size=" + size + + ", logicOffset=" + logicOffset + + ", isFullChunk=" + isFullChunk + + '}'; + } + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java new file mode 100644 index 000000000..a7cede09f --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -0,0 +1,86 @@ +package seaweedfs.client; + +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.mime.HttpMultipartMode; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class SeaweedWrite { + + public static void writeData(FilerProto.Entry.Builder entry, + final String replication, + final FilerGrpcClient filerGrpcClient, + final long offset, + final byte[] bytes, + final long bytesOffset, final long bytesLength) throws IOException { + FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection("") + .setReplication(replication) + .setDataCenter("") + .setReplication("") + .setTtlSec(0) + .build()); + String fileId = response.getFileId(); + String url = response.getUrl(); + String targetUrl = String.format("http://%s/%s", url, fileId); + + String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength); + + entry.addChunks(FilerProto.FileChunk.newBuilder() + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) + ); + + } + + public static void writeMeta(final FilerGrpcClient filerGrpcClient, + final String parentDirectory, final FilerProto.Entry.Builder entry) { + filerGrpcClient.getBlockingStub().createEntry( + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parentDirectory) + .setEntry(entry) + .build() + ); + } + + private static String multipartUpload(String targetUrl, + final byte[] bytes, + final long bytesOffset, final long bytesLength) throws IOException { + + CloseableHttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build(); + + InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + + HttpPost post = new HttpPost(targetUrl); + + post.setEntity(MultipartEntityBuilder.create() + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("upload", inputStream) + .build()); + + try { + HttpResponse response = client.execute(post); + + String etag = response.getLastHeader("ETag").getValue(); + + if (etag != null && etag.startsWith("\"") && etag.endsWith("\"")) { + etag = etag.substring(1, etag.length() - 1); + } + + return etag; + } finally { + client.close(); + } + + } +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java new file mode 100644 index 000000000..ccfcdb117 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -0,0 +1,63 @@ +package seaweedfs.client; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class SeaweedReadTest { + + @Test + public void testNonOverlappingVisibleIntervals() { + List<FilerProto.FileChunk> chunks = new ArrayList<>(); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("aaa") + .setOffset(0) + .setSize(100) + .setMtime(1000) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("bbb") + .setOffset(100) + .setSize(133) + .setMtime(2000) + .build()); + + List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + } + + Assert.assertEquals(visibleIntervals.size(), 2); + + SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); + Assert.assertEquals(visibleInterval.start, 0); + Assert.assertEquals(visibleInterval.stop, 100); + Assert.assertEquals(visibleInterval.modifiedTime, 1000); + Assert.assertEquals(visibleInterval.fileId, "aaa"); + + visibleInterval = visibleIntervals.get(1); + Assert.assertEquals(visibleInterval.start, 100); + Assert.assertEquals(visibleInterval.stop, 233); + Assert.assertEquals(visibleInterval.modifiedTime, 2000); + Assert.assertEquals(visibleInterval.fileId, "bbb"); + + List<SeaweedRead.ChunkView> chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233); + + SeaweedRead.ChunkView chunkView = chunkViews.get(0); + Assert.assertEquals(chunkView.offset, 0); + Assert.assertEquals(chunkView.size, 100); + Assert.assertEquals(chunkView.logicOffset, 0); + Assert.assertEquals(chunkView.fileId, "aaa"); + + chunkView = chunkViews.get(1); + Assert.assertEquals(chunkView.offset, 0); + Assert.assertEquals(chunkView.size, 133); + Assert.assertEquals(chunkView.logicOffset, 100); + Assert.assertEquals(chunkView.fileId, "bbb"); + + + } + +} |
