aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-03 09:06:09 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-03 09:06:09 -0700
commitb3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6 (patch)
treeb8e574444682d9650ec2099938658f81433b215d
parent53190a997261a1d2a1d98d1f64eaea2d6a82124e (diff)
downloadseaweedfs-b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6.tar.xz
seaweedfs-b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6.zip
add read ahead input stream
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java404
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
3 files changed, 408 insertions, 2 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java
new file mode 100644
index 000000000..52c7ac09c
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java
@@ -0,0 +1,404 @@
+package seaweedfs.client;
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead from the underlying input
+ * stream when specified amount of data has been read from the current buffer. It does it by
+ * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data
+ * which should be returned when a read() call is issued. The read ahead buffer is used to
+ * asynchronously read from the underlying input stream and once the current active buffer is
+ * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer
+ * without being blocked in disk I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
+
+ private ReentrantLock stateChangeLock = new ReentrantLock();
+
+ @GuardedBy("stateChangeLock")
+ private ByteBuffer activeBuffer;
+
+ @GuardedBy("stateChangeLock")
+ private ByteBuffer readAheadBuffer;
+
+ @GuardedBy("stateChangeLock")
+ private boolean endOfStream;
+
+ @GuardedBy("stateChangeLock")
+ // true if async read is in progress
+ private boolean readInProgress;
+
+ @GuardedBy("stateChangeLock")
+ // true if read is aborted due to an exception in reading from underlying input stream.
+ private boolean readAborted;
+
+ @GuardedBy("stateChangeLock")
+ private Throwable readException;
+
+ @GuardedBy("stateChangeLock")
+ // whether the close method is called.
+ private boolean isClosed;
+
+ @GuardedBy("stateChangeLock")
+ // true when the close method will close the underlying input stream. This is valid only if
+ // `isClosed` is true.
+ private boolean isUnderlyingInputStreamBeingClosed;
+
+ @GuardedBy("stateChangeLock")
+ // whether there is a read ahead task running,
+ private boolean isReading;
+
+ // whether there is a reader waiting for data.
+ private AtomicBoolean isWaiting = new AtomicBoolean(false);
+
+ private final InputStream underlyingInputStream;
+
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build()
+ );
+
+ private final Condition asyncReadComplete = stateChangeLock.newCondition();
+
+ private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
+
+ /**
+ * Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead
+ * threshold
+ *
+ * @param inputStream The underlying input stream.
+ * @param bufferSizeInBytes The buffer size.
+ */
+ public ReadAheadInputStream(
+ InputStream inputStream, int bufferSizeInBytes) {
+ Preconditions.checkArgument(bufferSizeInBytes > 0,
+ "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
+ activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+ readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+ this.underlyingInputStream = inputStream;
+ activeBuffer.flip();
+ readAheadBuffer.flip();
+ }
+
+ private boolean isEndOfStream() {
+ return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
+ }
+
+ private void checkReadException() throws IOException {
+ if (readAborted) {
+ Throwables.propagateIfPossible(readException, IOException.class);
+ throw new IOException(readException);
+ }
+ }
+
+ /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
+ private void readAsync() throws IOException {
+ stateChangeLock.lock();
+ final byte[] arr = readAheadBuffer.array();
+ try {
+ if (endOfStream || readInProgress) {
+ return;
+ }
+ checkReadException();
+ readAheadBuffer.position(0);
+ readAheadBuffer.flip();
+ readInProgress = true;
+ } finally {
+ stateChangeLock.unlock();
+ }
+ executorService.execute(() -> {
+ stateChangeLock.lock();
+ try {
+ if (isClosed) {
+ readInProgress = false;
+ return;
+ }
+ // Flip this so that the close method will not close the underlying input stream when we
+ // are reading.
+ isReading = true;
+ } finally {
+ stateChangeLock.unlock();
+ }
+
+ // Please note that it is safe to release the lock and read into the read ahead buffer
+ // because either of following two conditions will hold - 1. The active buffer has
+ // data available to read so the reader will not read from the read ahead buffer.
+ // 2. This is the first time read is called or the active buffer is exhausted,
+ // in that case the reader waits for this async read to complete.
+ // So there is no race condition in both the situations.
+ int read = 0;
+ int off = 0, len = arr.length;
+ Throwable exception = null;
+ try {
+ // try to fill the read ahead buffer.
+ // if a reader is waiting, possibly return early.
+ do {
+ read = underlyingInputStream.read(arr, off, len);
+ if (read <= 0) break;
+ off += read;
+ len -= read;
+ } while (len > 0 && !isWaiting.get());
+ } catch (Throwable ex) {
+ exception = ex;
+ if (ex instanceof Error) {
+ // `readException` may not be reported to the user. Rethrow Error to make sure at least
+ // The user can see Error in UncaughtExceptionHandler.
+ throw (Error) ex;
+ }
+ } finally {
+ stateChangeLock.lock();
+ readAheadBuffer.limit(off);
+ if (read < 0 || (exception instanceof EOFException)) {
+ endOfStream = true;
+ } else if (exception != null) {
+ readAborted = true;
+ readException = exception;
+ }
+ readInProgress = false;
+ signalAsyncReadComplete();
+ stateChangeLock.unlock();
+ closeUnderlyingInputStreamIfNecessary();
+ }
+ });
+ }
+
+ private void closeUnderlyingInputStreamIfNecessary() {
+ boolean needToCloseUnderlyingInputStream = false;
+ stateChangeLock.lock();
+ try {
+ isReading = false;
+ if (isClosed && !isUnderlyingInputStreamBeingClosed) {
+ // close method cannot close underlyingInputStream because we were reading.
+ needToCloseUnderlyingInputStream = true;
+ }
+ } finally {
+ stateChangeLock.unlock();
+ }
+ if (needToCloseUnderlyingInputStream) {
+ try {
+ underlyingInputStream.close();
+ } catch (IOException e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ private void signalAsyncReadComplete() {
+ stateChangeLock.lock();
+ try {
+ asyncReadComplete.signalAll();
+ } finally {
+ stateChangeLock.unlock();
+ }
+ }
+
+ private void waitForAsyncReadComplete() throws IOException {
+ stateChangeLock.lock();
+ isWaiting.set(true);
+ try {
+ // There is only one reader, and one writer, so the writer should signal only once,
+ // but a while loop checking the wake up condition is still needed to avoid spurious wakeups.
+ while (readInProgress) {
+ asyncReadComplete.await();
+ }
+ } catch (InterruptedException e) {
+ InterruptedIOException iio = new InterruptedIOException(e.getMessage());
+ iio.initCause(e);
+ throw iio;
+ } finally {
+ isWaiting.set(false);
+ stateChangeLock.unlock();
+ }
+ checkReadException();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (activeBuffer.hasRemaining()) {
+ // short path - just get one byte.
+ return activeBuffer.get() & 0xFF;
+ } else {
+ byte[] oneByteArray = oneByte.get();
+ return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int offset, int len) throws IOException {
+ if (offset < 0 || len < 0 || len > b.length - offset) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return 0;
+ }
+
+ if (!activeBuffer.hasRemaining()) {
+ // No remaining in active buffer - lock and switch to write ahead buffer.
+ stateChangeLock.lock();
+ try {
+ waitForAsyncReadComplete();
+ if (!readAheadBuffer.hasRemaining()) {
+ // The first read.
+ readAsync();
+ waitForAsyncReadComplete();
+ if (isEndOfStream()) {
+ return -1;
+ }
+ }
+ // Swap the newly read read ahead buffer in place of empty active buffer.
+ swapBuffers();
+ // After swapping buffers, trigger another async read for read ahead buffer.
+ readAsync();
+ } finally {
+ stateChangeLock.unlock();
+ }
+ }
+ len = Math.min(len, activeBuffer.remaining());
+ activeBuffer.get(b, offset, len);
+
+ return len;
+ }
+
+ /**
+ * flip the active and read ahead buffer
+ */
+ private void swapBuffers() {
+ ByteBuffer temp = activeBuffer;
+ activeBuffer = readAheadBuffer;
+ readAheadBuffer = temp;
+ }
+
+ @Override
+ public int available() throws IOException {
+ stateChangeLock.lock();
+ // Make sure we have no integer overflow.
+ try {
+ return (int) Math.min((long) Integer.MAX_VALUE,
+ (long) activeBuffer.remaining() + readAheadBuffer.remaining());
+ } finally {
+ stateChangeLock.unlock();
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0L) {
+ return 0L;
+ }
+ if (n <= activeBuffer.remaining()) {
+ // Only skipping from active buffer is sufficient
+ activeBuffer.position((int) n + activeBuffer.position());
+ return n;
+ }
+ stateChangeLock.lock();
+ long skipped;
+ try {
+ skipped = skipInternal(n);
+ } finally {
+ stateChangeLock.unlock();
+ }
+ return skipped;
+ }
+
+ /**
+ * Internal skip function which should be called only from skip() api. The assumption is that
+ * the stateChangeLock is already acquired in the caller before calling this function.
+ */
+ private long skipInternal(long n) throws IOException {
+ assert (stateChangeLock.isLocked());
+ waitForAsyncReadComplete();
+ if (isEndOfStream()) {
+ return 0;
+ }
+ if (available() >= n) {
+ // we can skip from the internal buffers
+ int toSkip = (int) n;
+ // We need to skip from both active buffer and read ahead buffer
+ toSkip -= activeBuffer.remaining();
+ assert(toSkip > 0); // skipping from activeBuffer already handled.
+ activeBuffer.position(0);
+ activeBuffer.flip();
+ readAheadBuffer.position(toSkip + readAheadBuffer.position());
+ swapBuffers();
+ // Trigger async read to emptied read ahead buffer.
+ readAsync();
+ return n;
+ } else {
+ int skippedBytes = available();
+ long toSkip = n - skippedBytes;
+ activeBuffer.position(0);
+ activeBuffer.flip();
+ readAheadBuffer.position(0);
+ readAheadBuffer.flip();
+ long skippedFromInputStream = underlyingInputStream.skip(toSkip);
+ readAsync();
+ return skippedBytes + skippedFromInputStream;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ boolean isSafeToCloseUnderlyingInputStream = false;
+ stateChangeLock.lock();
+ try {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+ if (!isReading) {
+ // Nobody is reading, so we can close the underlying input stream in this method.
+ isSafeToCloseUnderlyingInputStream = true;
+ // Flip this to make sure the read ahead task will not close the underlying input stream.
+ isUnderlyingInputStreamBeingClosed = true;
+ }
+ } finally {
+ stateChangeLock.unlock();
+ }
+
+ try {
+ executorService.shutdownNow();
+ executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ InterruptedIOException iio = new InterruptedIOException(e.getMessage());
+ iio.initCause(e);
+ throw iio;
+ } finally {
+ if (isSafeToCloseUnderlyingInputStream) {
+ underlyingInputStream.close();
+ }
+ }
+ }
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fd8877806..836bb4db5 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -11,6 +11,7 @@ import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerProto;
+import seaweedfs.client.ReadAheadInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -76,7 +77,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
+ return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fd8877806..836bb4db5 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -11,6 +11,7 @@ import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerProto;
+import seaweedfs.client.ReadAheadInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -76,7 +77,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
+ return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;