diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-12-27 23:40:23 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-12-27 23:40:23 -0800 |
| commit | 9123d799b86a26345d33187463f6d6ed78564fdb (patch) | |
| tree | 5d5699337a470be024bfd60f66166792b1123d4b /other/java/hdfs/src | |
| parent | 319ab6d98ff6f2d17ebda87fc4cfe65bf02667ef (diff) | |
| download | seaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.tar.xz seaweedfs-9123d799b86a26345d33187463f6d6ed78564fdb.zip | |
refactor
Diffstat (limited to 'other/java/hdfs/src')
7 files changed, 3 insertions, 436 deletions
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java deleted file mode 100644 index e9ea81f36..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java +++ /dev/null @@ -1,21 +0,0 @@ -package seaweed.hdfs; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -public class ByteBufferOutputStream extends OutputStream { - private final ByteBuffer buf; - - public ByteBufferOutputStream(ByteBuffer buf) { - this.buf = buf; - } - - public void write(int b) throws IOException { - this.buf.put((byte)b); - } - - public void write(byte[] b, int off, int len) throws IOException { - this.buf.put(b, off, len); - } -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index ffc109b20..27678e615 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory; import seaweedfs.client.FilerClient; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedRead; import java.io.FileNotFoundException; import java.io.IOException; diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java index c0b296fb9..90c14c772 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -10,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedRead; import java.io.EOFException; import java.io.IOException; diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 4dee4bf09..4f307ff96 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java deleted file mode 100644 index a4a2e9743..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java +++ /dev/null @@ -1,262 +0,0 @@ -package seaweed.hdfs; - -import org.apache.http.HttpEntity; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; - -public class SeaweedRead { - - // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); - - // returns bytesRead - public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, - final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) { - - List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); - - FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); - for (ChunkView chunkView : chunkViews) { - String vid = parseVolumeId(chunkView.fileId); - lookupRequest.addVolumeIds(vid); - } - - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); - - Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); - - //TODO parallel this - long readCount = 0; - int startOffset = bufferOffset; - for (ChunkView chunkView : chunkViews) { - FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); - if (locations.getLocationsCount() == 0) { - // log here! - return 0; - } - - int len = readChunkView(position, buffer, startOffset, chunkView, locations); - - readCount += len; - startOffset += len; - - } - - return readCount; - } - - private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) { - HttpClient client = HttpClientBuilder.create().build(); - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); - - if (!chunkView.isFullChunk) { - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size)); - } - - try { - HttpResponse response = client.execute(request); - HttpEntity entity = response.getEntity(); - - int len = (int) (chunkView.logicOffset - position + chunkView.size); - OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); - entity.writeTo(outputStream); - // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); - - return len; - - } catch (IOException e) { - e.printStackTrace(); - } - return 0; - } - - public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) { - List<ChunkView> views = new ArrayList<>(); - - long stop = offset + size; - for (VisibleInterval chunk : visibleIntervals) { - if (chunk.start <= offset && offset < chunk.stop && offset < stop) { - boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; - views.add(new ChunkView( - chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, - isFullChunk - )); - offset = Math.min(chunk.stop, stop); - } - } - return views; - } - - public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) { - FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); - Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { - @Override - public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - return (int) (a.getMtime() - b.getMtime()); - } - }); - - List<VisibleInterval> visibles = new ArrayList<>(); - for (FilerProto.FileChunk chunk : chunks) { - List<VisibleInterval> newVisibles = new ArrayList<>(); - visibles = mergeIntoVisibles(visibles, newVisibles, chunk); - } - - return visibles; - } - - private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles, - List<VisibleInterval> newVisibles, - FilerProto.FileChunk chunk) { - VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - true - ); - - // easy cases to speed up - if (visibles.size() == 0) { - visibles.add(newV); - return visibles; - } - if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) { - visibles.add(newV); - return visibles; - } - - for (VisibleInterval v : visibles) { - if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { - newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - false - )); - } - long chunkStop = chunk.getOffset() + chunk.getSize(); - if (v.start < chunkStop && chunkStop < v.stop) { - newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false - )); - } - if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { - newVisibles.add(v); - } - } - newVisibles.add(newV); - - // keep everything sorted - for (int i = newVisibles.size() - 1; i >= 0; i--) { - if (i > 0 && newV.start < newVisibles.get(i - 1).start) { - newVisibles.set(i, newVisibles.get(i - 1)); - } else { - newVisibles.set(i, newV); - break; - } - } - - return newVisibles; - } - - public static String parseVolumeId(String fileId) { - int commaIndex = fileId.lastIndexOf(','); - if (commaIndex > 0) { - return fileId.substring(0, commaIndex); - } - return fileId; - } - - public static long totalSize(List<FilerProto.FileChunk> chunksList) { - long size = 0; - for (FilerProto.FileChunk chunk : chunksList) { - long t = chunk.getOffset() + chunk.getSize(); - if (size < t) { - size = t; - } - } - return size; - } - - public static class VisibleInterval { - public final long start; - public final long stop; - public final long modifiedTime; - public final String fileId; - public final boolean isFullChunk; - - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { - this.start = start; - this.stop = stop; - this.modifiedTime = modifiedTime; - this.fileId = fileId; - this.isFullChunk = isFullChunk; - } - - @Override - public String toString() { - return "VisibleInterval{" + - "start=" + start + - ", stop=" + stop + - ", modifiedTime=" + modifiedTime + - ", fileId='" + fileId + '\'' + - ", isFullChunk=" + isFullChunk + - '}'; - } - } - - public static class ChunkView { - public final String fileId; - public final long offset; - public final long size; - public final long logicOffset; - public final boolean isFullChunk; - - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { - this.fileId = fileId; - this.offset = offset; - this.size = size; - this.logicOffset = logicOffset; - this.isFullChunk = isFullChunk; - } - - @Override - public String toString() { - return "ChunkView{" + - "fileId='" + fileId + '\'' + - ", offset=" + offset + - ", size=" + size + - ", logicOffset=" + logicOffset + - ", isFullChunk=" + isFullChunk + - '}'; - } - } - -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java deleted file mode 100644 index ee2131007..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java +++ /dev/null @@ -1,88 +0,0 @@ -package seaweed.hdfs; - -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.mime.HttpMultipartMode; -import org.apache.http.entity.mime.MultipartEntityBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; - -public class SeaweedWrite { - - public static void writeData(FilerProto.Entry.Builder entry, - final String replication, - final FilerGrpcClient filerGrpcClient, - final long offset, - final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { - FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( - FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) - .setDataCenter("") - .setReplication("") - .setTtlSec(0) - .build()); - String fileId = response.getFileId(); - String url = response.getUrl(); - String targetUrl = String.format("http://%s/%s", url, fileId); - - String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength); - - entry.addChunks(FilerProto.FileChunk.newBuilder() - .setFileId(fileId) - .setOffset(offset) - .setSize(bytesLength) - .setMtime(System.currentTimeMillis() / 10000L) - .setETag(etag) - ); - - } - - public static void writeMeta(final FilerGrpcClient filerGrpcClient, - final String parentDirectory, final FilerProto.Entry.Builder entry) { - filerGrpcClient.getBlockingStub().createEntry( - FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parentDirectory) - .setEntry(entry) - .build() - ); - } - - private static String multipartUpload(String targetUrl, - final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { - - CloseableHttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build(); - - InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); - - HttpPost post = new HttpPost(targetUrl); - - post.setEntity(MultipartEntityBuilder.create() - .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) - .addBinaryBody("upload", inputStream) - .build()); - - try { - HttpResponse response = client.execute(post); - - String etag = response.getLastHeader("ETag").getValue(); - - if (etag != null && etag.startsWith("\"") && etag.endsWith("\"")) { - etag = etag.substring(1, etag.length() - 1); - } - - return etag; - } finally { - client.close(); - } - - } -} diff --git a/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java b/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java deleted file mode 100644 index e3ab97910..000000000 --- a/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java +++ /dev/null @@ -1,65 +0,0 @@ -package seaweed.hdfs; - -import org.junit.Test; -import seaweedfs.client.FilerProto; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class SeaweedReadTest { - - @Test - public void testNonOverlappingVisibleIntervals() { - List<FilerProto.FileChunk> chunks = new ArrayList<>(); - chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("aaa") - .setOffset(0) - .setSize(100) - .setMtime(1000) - .build()); - chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("bbb") - .setOffset(100) - .setSize(133) - .setMtime(2000) - .build()); - - List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); - for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { - System.out.println("visible:" + visibleInterval); - } - - assertEquals(visibleIntervals.size(), 2); - - SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); - assertEquals(visibleInterval.start, 0); - assertEquals(visibleInterval.stop, 100); - assertEquals(visibleInterval.modifiedTime, 1000); - assertEquals(visibleInterval.fileId, "aaa"); - - visibleInterval = visibleIntervals.get(1); - assertEquals(visibleInterval.start, 100); - assertEquals(visibleInterval.stop, 233); - assertEquals(visibleInterval.modifiedTime, 2000); - assertEquals(visibleInterval.fileId, "bbb"); - - List<SeaweedRead.ChunkView> chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233); - - SeaweedRead.ChunkView chunkView = chunkViews.get(0); - assertEquals(chunkView.offset, 0); - assertEquals(chunkView.size, 100); - assertEquals(chunkView.logicOffset, 0); - assertEquals(chunkView.fileId, "aaa"); - - chunkView = chunkViews.get(1); - assertEquals(chunkView.offset, 0); - assertEquals(chunkView.size, 133); - assertEquals(chunkView.logicOffset, 100); - assertEquals(chunkView.fileId, "bbb"); - - - } - -} |
