aboutsummaryrefslogtreecommitdiff
path: root/other/java/hdfs3/src
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/hdfs3/src')
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java49
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java2
2 files changed, 50 insertions, 1 deletions
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java
new file mode 100644
index 000000000..0bee6e43f
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java
@@ -0,0 +1,49 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable {
+
+ SeaweedInputStream t;
+
+ protected BufferedSeaweedInputStream(InputStream in, int bufferSize) {
+ super(new BufferedInputStream(in, bufferSize));
+ t = (SeaweedInputStream)in;
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ return this.t.read(position,buffer,offset,length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ this.t.readFully(position,buffer,offset,length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ this.t.readFully(position,buffer);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ this.t.seek(pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.t.getPos();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return this.t.seekToNewSource(targetPos);
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 2341d335d..585fa39a3 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -76,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(inputStream);
+ return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;