diff options
Diffstat (limited to 'other/java/hdfs2')
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java | 25 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 2 |
2 files changed, 26 insertions, 1 deletions
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java new file mode 100644 index 000000000..3d0b68a52 --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java @@ -0,0 +1,25 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable { + + public BufferedByteBufferReadableInputStream(FSInputStream in, int size) { + super(in, size); + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable"); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (this.in instanceof ByteBufferReadable) { + return ((ByteBufferReadable)this.in).read(buf); + } else { + throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); + } + } +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index fb1f4c53b..84f11e846 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); - return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); + return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; |
