aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java192
1 files changed, 192 insertions, 0 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
new file mode 100644
index 000000000..312e77aa2
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -0,0 +1,192 @@
+package seaweedfs.client;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SeaweedInputStream extends InputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+ private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!");
+
+ private final FilerGrpcClient filerGrpcClient;
+ private final String path;
+ private final FilerProto.Entry entry;
+ private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
+ private final long contentLength;
+
+ private long position = 0; // cursor of the file
+
+ private boolean closed = false;
+
+ public SeaweedInputStream(
+ final FilerGrpcClient filerGrpcClient,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.filerGrpcClient = filerGrpcClient;
+ this.path = path;
+ this.entry = entry;
+ this.contentLength = SeaweedRead.fileSize(entry);
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int numberOfBytesRead = read(b, 0, 1);
+ if (numberOfBytesRead < 0) {
+ return -1;
+ } else {
+ return (b[0] & 0xFF);
+ }
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+
+ if (b == null) {
+ throw new IllegalArgumentException("null byte array passed in to read() method");
+ }
+ if (off >= b.length) {
+ throw new IllegalArgumentException("offset greater than length of array");
+ }
+ if (len < 0) {
+ throw new IllegalArgumentException("requested read length is less than zero");
+ }
+ if (len > (b.length - off)) {
+ throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ }
+
+ ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+ return read(buf);
+
+ }
+
+ // implement ByteBufferReadable
+ public synchronized int read(ByteBuffer buf) throws IOException {
+
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+
+ long bytesRead = 0;
+ int len = buf.remaining();
+ int start = (int) this.position;
+ if (start+len <= entry.getContent().size()) {
+ entry.getContent().substring(start, start+len).copyTo(buf);
+ } else {
+ bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
+ }
+
+ if (bytesRead > Integer.MAX_VALUE) {
+ throw new IOException("Unexpected Content-Length");
+ }
+
+ if (bytesRead > 0) {
+ this.position += bytesRead;
+ }
+
+ return (int) bytesRead;
+ }
+
+ public synchronized void seek(long n) throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ if (n < 0) {
+ throw new EOFException("Cannot seek to a negative offset");
+ }
+ if (n > contentLength) {
+ throw new EOFException("Attempted to seek or read past the end of the file");
+ }
+ this.position = n;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ if (this.position == contentLength) {
+ if (n > 0) {
+ throw new EOFException("Attempted to seek or read past the end of the file");
+ }
+ }
+ long newPos = this.position + n;
+ if (newPos < 0) {
+ newPos = 0;
+ n = newPos - this.position;
+ }
+ if (newPos > contentLength) {
+ newPos = contentLength;
+ n = newPos - this.position;
+ }
+ seek(newPos);
+ return n;
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ final long remaining = this.contentLength - this.position;
+ return remaining <= Integer.MAX_VALUE
+ ? (int) remaining : Integer.MAX_VALUE;
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ return contentLength;
+ }
+
+ public synchronized long getPos() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ return position;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ }
+
+}