aboutsummaryrefslogtreecommitdiff
path: root/other
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-12-03 00:08:05 -0800
committerChris Lu <chris.lu@gmail.com>2020-12-03 00:08:05 -0800
commit4d2855476c35a2762a225c6707731067e84c71bc (patch)
tree30b0aab30e1c8188d352a51150f0a79651c3aded /other
parent3857f9c8404c6f251135268e42c79bb83c13b2b0 (diff)
downloadseaweedfs-4d2855476c35a2762a225c6707731067e84c71bc.tar.xz
seaweedfs-4d2855476c35a2762a225c6707731067e84c71bc.zip
Hadoop: add BufferedByteBufferReadableInputStream
fix https://github.com/chrislusf/seaweedfs/issues/1645
Diffstat (limited to 'other')
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java25
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java25
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java8
5 files changed, 53 insertions, 9 deletions
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
new file mode 100644
index 000000000..3d0b68a52
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
@@ -0,0 +1,25 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
+
+ public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
+ super(in, size);
+ if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
+ throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (this.in instanceof ByteBufferReadable) {
+ return ((ByteBufferReadable)this.in).read(buf);
+ } else {
+ throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ }
+ }
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fb1f4c53b..84f11e846 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
+ return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
new file mode 100644
index 000000000..3d0b68a52
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
@@ -0,0 +1,25 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
+
+ public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
+ super(in, size);
+ if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
+ throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (this.in instanceof ByteBufferReadable) {
+ return ((ByteBufferReadable)this.in).read(buf);
+ } else {
+ throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ }
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fb1f4c53b..84f11e846 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
+ return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 752f06374..690366849 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -65,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
}
@Override
- public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+ public int read(final byte[] b, final int off, final int len) throws IOException {
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}