aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-30 06:16:02 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-30 06:16:02 -0800
commit6f4aab51f9d7e8d9721c3c4fd7b57da7ab8b6d8e (patch)
tree3d434d68fb7ddd9c63fe597a680255bf2fc66c80
parent043c2d796098e1f88da1442eefdb9bd5df075708 (diff)
downloadseaweedfs-6f4aab51f9d7e8d9721c3c4fd7b57da7ab8b6d8e.tar.xz
seaweedfs-6f4aab51f9d7e8d9721c3c4fd7b57da7ab8b6d8e.zip
refactoring SeaweedInputStream
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java (renamed from other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java)93
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java4
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java150
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java150
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java259
7 files changed, 317 insertions, 343 deletions
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
index 690366849..312e77aa2 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -1,28 +1,22 @@
-package seaweed.hdfs;
+package seaweedfs.client;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
-public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
+public class SeaweedInputStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+ private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!");
private final FilerGrpcClient filerGrpcClient;
- private final Statistics statistics;
private final String path;
private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
@@ -34,11 +28,9 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
final String path,
final FilerProto.Entry entry) throws IOException {
this.filerGrpcClient = filerGrpcClient;
- this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry);
@@ -86,7 +78,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
}
// implement ByteBufferReadable
- @Override
public synchronized int read(ByteBuffer buf) throws IOException {
if (position < 0) {
@@ -111,45 +102,32 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
if (bytesRead > 0) {
this.position += bytesRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesRead);
- }
}
return (int) bytesRead;
}
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
public synchronized void seek(long n) throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
if (n < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ throw new EOFException("Cannot seek to a negative offset");
}
if (n > contentLength) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ throw new EOFException("Attempted to seek or read past the end of the file");
}
-
this.position = n;
-
}
@Override
public synchronized long skip(long n) throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
if (this.position == contentLength) {
if (n > 0) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ throw new EOFException("Attempted to seek or read past the end of the file");
}
}
long newPos = this.position + n;
@@ -177,10 +155,9 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
@Override
public synchronized int available() throws IOException {
if (closed) {
- throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
- final long remaining = this.contentLength - this.getPos();
+ final long remaining = this.contentLength - this.position;
return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE;
}
@@ -195,65 +172,21 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
*/
public long length() throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
return contentLength;
}
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
public synchronized long getPos() throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
return position;
}
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
@Override
public synchronized void close() throws IOException {
closed = true;
}
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
}
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
index 0529a5c73..317327be4 100644
--- a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
@@ -1,6 +1,6 @@
package com.seaweedfs.examples;
-import seaweed.hdfs.SeaweedInputStream;
+import seaweed.hdfs.SeaweedHadoopInputStream;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
@@ -24,7 +24,7 @@ public class UnzipFile {
long localProcessTime = startTime2 - startTime;
- SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
+ SeaweedHadoopInputStream seaweedInputStream = new SeaweedHadoopInputStream(
filerGrpcClient,
new org.apache.hadoop.fs.FileSystem.Statistics(""),
"/",
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 719e52579..223036c13 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -230,7 +230,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path);
}
- return new SeaweedInputStream(filerGrpcClient,
+ return new SeaweedHadoopInputStream(filerGrpcClient,
statistics,
path.toUri().getPath(),
entry);
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
new file mode 100644
index 000000000..dd9bf4032
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -0,0 +1,150 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+
+ private final SeaweedInputStream seaweedInputStream;
+ private final Statistics statistics;
+
+ public SeaweedHadoopInputStream(
+ final FilerGrpcClient filerGrpcClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry);
+ this.statistics = statistics;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return seaweedInputStream.read();
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return seaweedInputStream.read(b, off, len);
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int bytesRead = seaweedInputStream.read(buf);
+
+ if (bytesRead > 0) {
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ seaweedInputStream.seek(n);
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ return seaweedInputStream.skip(n);
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ return seaweedInputStream.available();
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ return seaweedInputStream.length();
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return seaweedInputStream.getPos();
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ seaweedInputStream.close();
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 719e52579..223036c13 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -230,7 +230,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path);
}
- return new SeaweedInputStream(filerGrpcClient,
+ return new SeaweedHadoopInputStream(filerGrpcClient,
statistics,
path.toUri().getPath(),
entry);
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
new file mode 100644
index 000000000..dd9bf4032
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -0,0 +1,150 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+
+ private final SeaweedInputStream seaweedInputStream;
+ private final Statistics statistics;
+
+ public SeaweedHadoopInputStream(
+ final FilerGrpcClient filerGrpcClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry);
+ this.statistics = statistics;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return seaweedInputStream.read();
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return seaweedInputStream.read(b, off, len);
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int bytesRead = seaweedInputStream.read(buf);
+
+ if (bytesRead > 0) {
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ seaweedInputStream.seek(n);
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ return seaweedInputStream.skip(n);
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ return seaweedInputStream.available();
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ return seaweedInputStream.length();
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return seaweedInputStream.getPos();
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ seaweedInputStream.close();
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
deleted file mode 100644
index 690366849..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ /dev/null
@@ -1,259 +0,0 @@
-package seaweed.hdfs;
-
-// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Statistics statistics;
- private final String path;
- private final FilerProto.Entry entry;
- private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
- private final long contentLength;
-
- private long position = 0; // cursor of the file
-
- private boolean closed = false;
-
- public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry) throws IOException {
- this.filerGrpcClient = filerGrpcClient;
- this.statistics = statistics;
- this.path = path;
- this.entry = entry;
- this.contentLength = SeaweedRead.fileSize(entry);
-
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
-
- LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
-
- }
-
- public String getPath() {
- return path;
- }
-
- @Override
- public int read() throws IOException {
- byte[] b = new byte[1];
- int numberOfBytesRead = read(b, 0, 1);
- if (numberOfBytesRead < 0) {
- return -1;
- } else {
- return (b[0] & 0xFF);
- }
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
-
- if (b == null) {
- throw new IllegalArgumentException("null byte array passed in to read() method");
- }
- if (off >= b.length) {
- throw new IllegalArgumentException("offset greater than length of array");
- }
- if (len < 0) {
- throw new IllegalArgumentException("requested read length is less than zero");
- }
- if (len > (b.length - off)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
- }
-
- ByteBuffer buf = ByteBuffer.wrap(b, off, len);
- return read(buf);
-
- }
-
- // implement ByteBufferReadable
- @Override
- public synchronized int read(ByteBuffer buf) throws IOException {
-
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
-
- long bytesRead = 0;
- int len = buf.remaining();
- int start = (int) this.position;
- if (start+len <= entry.getContent().size()) {
- entry.getContent().substring(start, start+len).copyTo(buf);
- } else {
- bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
- }
-
- if (bytesRead > Integer.MAX_VALUE) {
- throw new IOException("Unexpected Content-Length");
- }
-
- if (bytesRead > 0) {
- this.position += bytesRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesRead);
- }
- }
-
- return (int) bytesRead;
- }
-
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
- public synchronized void seek(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- if (n < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
- }
- if (n > contentLength) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
-
- this.position = n;
-
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- if (this.position == contentLength) {
- if (n > 0) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
- }
- long newPos = this.position + n;
- if (newPos < 0) {
- newPos = 0;
- n = newPos - this.position;
- }
- if (newPos > contentLength) {
- newPos = contentLength;
- n = newPos - this.position;
- }
- seek(newPos);
- return n;
- }
-
- /**
- * Return the size of the remaining available bytes
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
- * otherwise, return {@link Integer#MAX_VALUE}.
- * <p>
- * This is to match the behavior of DFSInputStream.available(),
- * which some clients may rely on (HBase write-ahead log reading in
- * particular).
- */
- @Override
- public synchronized int available() throws IOException {
- if (closed) {
- throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
- }
- final long remaining = this.contentLength - this.getPos();
- return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
- }
-
- /**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
- * they wont be reflected in the returned length.
- *
- * @return length of the file.
- * @throws IOException if the stream is closed
- */
- public long length() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return contentLength;
- }
-
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public synchronized long getPos() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return position;
- }
-
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
- @Override
- public synchronized void close() throws IOException {
- closed = true;
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-}