aboutsummaryrefslogtreecommitdiff
path: root/other/java/hdfs2/src
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-12-01 17:20:31 -0800
committerChris Lu <chris.lu@gmail.com>2020-12-01 17:20:31 -0800
commita9efaa6385eb4a2e166cfbe8d8f5498d7dd95a91 (patch)
tree0608304ab68aa346f0f590139f33636ed714919a /other/java/hdfs2/src
parent04062c56c74591c32380bbdfb589a9c641e05fa5 (diff)
downloadseaweedfs-a9efaa6385eb4a2e166cfbe8d8f5498d7dd95a91.tar.xz
seaweedfs-a9efaa6385eb4a2e166cfbe8d8f5498d7dd95a91.zip
HDFS: implement ByteBufferReadable
fix https://github.com/chrislusf/seaweedfs/issues/1645
Diffstat (limited to 'other/java/hdfs2/src')
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java12
1 files changed, 10 insertions, 2 deletions
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 8e406206d..2cf544162 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,6 +2,7 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
-public class SeaweedInputStream extends FSInputStream {
+public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
@@ -85,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream {
}
long bytesRead = 0;
- if (position+len < entry.getContent().size()) {
+ if (position+len <= entry.getContent().size()) {
entry.getContent().copyTo(b, (int) position, (int) off, len);
} else {
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
@@ -106,6 +108,12 @@ public class SeaweedInputStream extends FSInputStream {
}
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ return read(buf.array(), buf.position(), buf.remaining());
+ }
+
/**
* Seek to given position in stream.
*