From 319ab6d98ff6f2d17ebda87fc4cfe65bf02667ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 27 Dec 2018 23:29:51 -0800 Subject: refactoring --- .../java/seaweed/hdfs/ByteBufferOutputStream.java | 21 +++++++ .../java/seaweed/hdfs/SeaweedOutputStream.java | 4 +- .../src/main/java/seaweed/hdfs/SeaweedRead.java | 7 +-- .../src/main/java/seaweed/hdfs/SeaweedWrite.java | 7 +-- .../test/java/seaweed/hdfs/SeaweedReadTest.java | 65 +++++++++++++++++++++ .../test/java/seaweedfs/hdfs/SeaweedReadTest.java | 66 ---------------------- 6 files changed, 93 insertions(+), 77 deletions(-) create mode 100644 other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java create mode 100644 other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java delete mode 100644 other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java (limited to 'other/java/hdfs/src') diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java new file mode 100644 index 000000000..e9ea81f36 --- /dev/null +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/ByteBufferOutputStream.java @@ -0,0 +1,21 @@ +package seaweed.hdfs; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ByteBufferOutputStream extends OutputStream { + private final ByteBuffer buf; + + public ByteBufferOutputStream(ByteBuffer buf) { + this.buf = buf; + } + + public void write(int b) throws IOException { + this.buf.put((byte)b); + } + + public void write(byte[] b, int off, int len) throws IOException { + this.buf.put(b, off, len); + } +} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 19894956a..4dee4bf09 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; + public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); @@ -79,7 +81,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry); try { - SeaweedWrite.writeMeta(filerGrpcClient, path, entry); + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java index a39e0f59c..a4a2e9743 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java @@ -1,14 +1,11 @@ package seaweed.hdfs; -import org.apache.hadoop.hdfs.util.ByteBufferOutputStream; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; @@ -23,7 +20,7 @@ import java.util.Map; public class SeaweedRead { - private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, @@ -81,7 +78,7 @@ public class SeaweedRead { int len = (int) (chunkView.logicOffset - position + chunkView.size); OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); entity.writeTo(outputStream); - LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); + // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); return len; diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java index cfe0e1c24..ee2131007 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java @@ -1,6 +1,5 @@ package seaweed.hdfs; -import org.apache.hadoop.fs.Path; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; @@ -14,8 +13,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; - public class SeaweedWrite { public static void writeData(FilerProto.Entry.Builder entry, @@ -49,10 +46,10 @@ public class SeaweedWrite { } public static void writeMeta(final FilerGrpcClient filerGrpcClient, - final Path path, final FilerProto.Entry.Builder entry) { + final String parentDirectory, final FilerProto.Entry.Builder entry) { filerGrpcClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(getParentDirectory(path)) + .setDirectory(parentDirectory) .setEntry(entry) .build() ); diff --git a/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java b/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java new file mode 100644 index 000000000..e3ab97910 --- /dev/null +++ b/other/java/hdfs/src/test/java/seaweed/hdfs/SeaweedReadTest.java @@ -0,0 +1,65 @@ +package seaweed.hdfs; + +import org.junit.Test; +import seaweedfs.client.FilerProto; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class SeaweedReadTest { + + @Test + public void testNonOverlappingVisibleIntervals() { + List chunks = new ArrayList<>(); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("aaa") + .setOffset(0) + .setSize(100) + .setMtime(1000) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("bbb") + .setOffset(100) + .setSize(133) + .setMtime(2000) + .build()); + + List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + } + + assertEquals(visibleIntervals.size(), 2); + + SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); + assertEquals(visibleInterval.start, 0); + assertEquals(visibleInterval.stop, 100); + assertEquals(visibleInterval.modifiedTime, 1000); + assertEquals(visibleInterval.fileId, "aaa"); + + visibleInterval = visibleIntervals.get(1); + assertEquals(visibleInterval.start, 100); + assertEquals(visibleInterval.stop, 233); + assertEquals(visibleInterval.modifiedTime, 2000); + assertEquals(visibleInterval.fileId, "bbb"); + + List chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233); + + SeaweedRead.ChunkView chunkView = chunkViews.get(0); + assertEquals(chunkView.offset, 0); + assertEquals(chunkView.size, 100); + assertEquals(chunkView.logicOffset, 0); + assertEquals(chunkView.fileId, "aaa"); + + chunkView = chunkViews.get(1); + assertEquals(chunkView.offset, 0); + assertEquals(chunkView.size, 133); + assertEquals(chunkView.logicOffset, 100); + assertEquals(chunkView.fileId, "bbb"); + + + } + +} diff --git a/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java b/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java deleted file mode 100644 index 4bb9efff5..000000000 --- a/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java +++ /dev/null @@ -1,66 +0,0 @@ -package seaweedfs.hdfs; - -import org.junit.Test; -import seaweed.hdfs.SeaweedRead; -import seaweedfs.client.FilerProto; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class SeaweedReadTest { - - @Test - public void testNonOverlappingVisibleIntervals() { - List chunks = new ArrayList<>(); - chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("aaa") - .setOffset(0) - .setSize(100) - .setMtime(1000) - .build()); - chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("bbb") - .setOffset(100) - .setSize(133) - .setMtime(2000) - .build()); - - List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); - for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { - System.out.println("visible:" + visibleInterval); - } - - assertEquals(visibleIntervals.size(), 2); - - SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); - assertEquals(visibleInterval.start, 0); - assertEquals(visibleInterval.stop, 100); - assertEquals(visibleInterval.modifiedTime, 1000); - assertEquals(visibleInterval.fileId, "aaa"); - - visibleInterval = visibleIntervals.get(1); - assertEquals(visibleInterval.start, 100); - assertEquals(visibleInterval.stop, 233); - assertEquals(visibleInterval.modifiedTime, 2000); - assertEquals(visibleInterval.fileId, "bbb"); - - List chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233); - - SeaweedRead.ChunkView chunkView = chunkViews.get(0); - assertEquals(chunkView.offset, 0); - assertEquals(chunkView.size, 100); - assertEquals(chunkView.logicOffset, 0); - assertEquals(chunkView.fileId, "aaa"); - - chunkView = chunkViews.get(1); - assertEquals(chunkView.offset, 0); - assertEquals(chunkView.size, 133); - assertEquals(chunkView.logicOffset, 100); - assertEquals(chunkView.fileId, "bbb"); - - - } - -} -- cgit v1.2.3