aboutsummaryrefslogtreecommitdiff
path: root/other
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-03 09:11:24 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-03 09:11:24 -0700
commit15dc0a704db7aa542471b56f10ceb749dc041b12 (patch)
treec423081dc295a224046e6778be0858d4c2d6fbb8 /other
parentb3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6 (diff)
downloadseaweedfs-15dc0a704db7aa542471b56f10ceb749dc041b12.tar.xz
seaweedfs-15dc0a704db7aa542471b56f10ceb749dc041b12.zip
Revert "add read ahead input stream"
This reverts commit b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6.
Diffstat (limited to 'other')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java404
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
3 files changed, 2 insertions, 408 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java
deleted file mode 100644
index 52c7ac09c..000000000
--- a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java
+++ /dev/null
@@ -1,404 +0,0 @@
-package seaweedfs.client;
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// package org.apache.spark.io;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * {@link InputStream} implementation which asynchronously reads ahead from the underlying input
- * stream when specified amount of data has been read from the current buffer. It does it by
- * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data
- * which should be returned when a read() call is issued. The read ahead buffer is used to
- * asynchronously read from the underlying input stream and once the current active buffer is
- * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer
- * without being blocked in disk I/O.
- */
-public class ReadAheadInputStream extends InputStream {
-
- private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
-
- private ReentrantLock stateChangeLock = new ReentrantLock();
-
- @GuardedBy("stateChangeLock")
- private ByteBuffer activeBuffer;
-
- @GuardedBy("stateChangeLock")
- private ByteBuffer readAheadBuffer;
-
- @GuardedBy("stateChangeLock")
- private boolean endOfStream;
-
- @GuardedBy("stateChangeLock")
- // true if async read is in progress
- private boolean readInProgress;
-
- @GuardedBy("stateChangeLock")
- // true if read is aborted due to an exception in reading from underlying input stream.
- private boolean readAborted;
-
- @GuardedBy("stateChangeLock")
- private Throwable readException;
-
- @GuardedBy("stateChangeLock")
- // whether the close method is called.
- private boolean isClosed;
-
- @GuardedBy("stateChangeLock")
- // true when the close method will close the underlying input stream. This is valid only if
- // `isClosed` is true.
- private boolean isUnderlyingInputStreamBeingClosed;
-
- @GuardedBy("stateChangeLock")
- // whether there is a read ahead task running,
- private boolean isReading;
-
- // whether there is a reader waiting for data.
- private AtomicBoolean isWaiting = new AtomicBoolean(false);
-
- private final InputStream underlyingInputStream;
-
- private final ExecutorService executorService = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build()
- );
-
- private final Condition asyncReadComplete = stateChangeLock.newCondition();
-
- private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
-
- /**
- * Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead
- * threshold
- *
- * @param inputStream The underlying input stream.
- * @param bufferSizeInBytes The buffer size.
- */
- public ReadAheadInputStream(
- InputStream inputStream, int bufferSizeInBytes) {
- Preconditions.checkArgument(bufferSizeInBytes > 0,
- "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
- activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
- readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
- this.underlyingInputStream = inputStream;
- activeBuffer.flip();
- readAheadBuffer.flip();
- }
-
- private boolean isEndOfStream() {
- return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
- }
-
- private void checkReadException() throws IOException {
- if (readAborted) {
- Throwables.propagateIfPossible(readException, IOException.class);
- throw new IOException(readException);
- }
- }
-
- /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
- private void readAsync() throws IOException {
- stateChangeLock.lock();
- final byte[] arr = readAheadBuffer.array();
- try {
- if (endOfStream || readInProgress) {
- return;
- }
- checkReadException();
- readAheadBuffer.position(0);
- readAheadBuffer.flip();
- readInProgress = true;
- } finally {
- stateChangeLock.unlock();
- }
- executorService.execute(() -> {
- stateChangeLock.lock();
- try {
- if (isClosed) {
- readInProgress = false;
- return;
- }
- // Flip this so that the close method will not close the underlying input stream when we
- // are reading.
- isReading = true;
- } finally {
- stateChangeLock.unlock();
- }
-
- // Please note that it is safe to release the lock and read into the read ahead buffer
- // because either of following two conditions will hold - 1. The active buffer has
- // data available to read so the reader will not read from the read ahead buffer.
- // 2. This is the first time read is called or the active buffer is exhausted,
- // in that case the reader waits for this async read to complete.
- // So there is no race condition in both the situations.
- int read = 0;
- int off = 0, len = arr.length;
- Throwable exception = null;
- try {
- // try to fill the read ahead buffer.
- // if a reader is waiting, possibly return early.
- do {
- read = underlyingInputStream.read(arr, off, len);
- if (read <= 0) break;
- off += read;
- len -= read;
- } while (len > 0 && !isWaiting.get());
- } catch (Throwable ex) {
- exception = ex;
- if (ex instanceof Error) {
- // `readException` may not be reported to the user. Rethrow Error to make sure at least
- // The user can see Error in UncaughtExceptionHandler.
- throw (Error) ex;
- }
- } finally {
- stateChangeLock.lock();
- readAheadBuffer.limit(off);
- if (read < 0 || (exception instanceof EOFException)) {
- endOfStream = true;
- } else if (exception != null) {
- readAborted = true;
- readException = exception;
- }
- readInProgress = false;
- signalAsyncReadComplete();
- stateChangeLock.unlock();
- closeUnderlyingInputStreamIfNecessary();
- }
- });
- }
-
- private void closeUnderlyingInputStreamIfNecessary() {
- boolean needToCloseUnderlyingInputStream = false;
- stateChangeLock.lock();
- try {
- isReading = false;
- if (isClosed && !isUnderlyingInputStreamBeingClosed) {
- // close method cannot close underlyingInputStream because we were reading.
- needToCloseUnderlyingInputStream = true;
- }
- } finally {
- stateChangeLock.unlock();
- }
- if (needToCloseUnderlyingInputStream) {
- try {
- underlyingInputStream.close();
- } catch (IOException e) {
- logger.warn(e.getMessage(), e);
- }
- }
- }
-
- private void signalAsyncReadComplete() {
- stateChangeLock.lock();
- try {
- asyncReadComplete.signalAll();
- } finally {
- stateChangeLock.unlock();
- }
- }
-
- private void waitForAsyncReadComplete() throws IOException {
- stateChangeLock.lock();
- isWaiting.set(true);
- try {
- // There is only one reader, and one writer, so the writer should signal only once,
- // but a while loop checking the wake up condition is still needed to avoid spurious wakeups.
- while (readInProgress) {
- asyncReadComplete.await();
- }
- } catch (InterruptedException e) {
- InterruptedIOException iio = new InterruptedIOException(e.getMessage());
- iio.initCause(e);
- throw iio;
- } finally {
- isWaiting.set(false);
- stateChangeLock.unlock();
- }
- checkReadException();
- }
-
- @Override
- public int read() throws IOException {
- if (activeBuffer.hasRemaining()) {
- // short path - just get one byte.
- return activeBuffer.get() & 0xFF;
- } else {
- byte[] oneByteArray = oneByte.get();
- return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
- }
- }
-
- @Override
- public int read(byte[] b, int offset, int len) throws IOException {
- if (offset < 0 || len < 0 || len > b.length - offset) {
- throw new IndexOutOfBoundsException();
- }
- if (len == 0) {
- return 0;
- }
-
- if (!activeBuffer.hasRemaining()) {
- // No remaining in active buffer - lock and switch to write ahead buffer.
- stateChangeLock.lock();
- try {
- waitForAsyncReadComplete();
- if (!readAheadBuffer.hasRemaining()) {
- // The first read.
- readAsync();
- waitForAsyncReadComplete();
- if (isEndOfStream()) {
- return -1;
- }
- }
- // Swap the newly read read ahead buffer in place of empty active buffer.
- swapBuffers();
- // After swapping buffers, trigger another async read for read ahead buffer.
- readAsync();
- } finally {
- stateChangeLock.unlock();
- }
- }
- len = Math.min(len, activeBuffer.remaining());
- activeBuffer.get(b, offset, len);
-
- return len;
- }
-
- /**
- * flip the active and read ahead buffer
- */
- private void swapBuffers() {
- ByteBuffer temp = activeBuffer;
- activeBuffer = readAheadBuffer;
- readAheadBuffer = temp;
- }
-
- @Override
- public int available() throws IOException {
- stateChangeLock.lock();
- // Make sure we have no integer overflow.
- try {
- return (int) Math.min((long) Integer.MAX_VALUE,
- (long) activeBuffer.remaining() + readAheadBuffer.remaining());
- } finally {
- stateChangeLock.unlock();
- }
- }
-
- @Override
- public long skip(long n) throws IOException {
- if (n <= 0L) {
- return 0L;
- }
- if (n <= activeBuffer.remaining()) {
- // Only skipping from active buffer is sufficient
- activeBuffer.position((int) n + activeBuffer.position());
- return n;
- }
- stateChangeLock.lock();
- long skipped;
- try {
- skipped = skipInternal(n);
- } finally {
- stateChangeLock.unlock();
- }
- return skipped;
- }
-
- /**
- * Internal skip function which should be called only from skip() api. The assumption is that
- * the stateChangeLock is already acquired in the caller before calling this function.
- */
- private long skipInternal(long n) throws IOException {
- assert (stateChangeLock.isLocked());
- waitForAsyncReadComplete();
- if (isEndOfStream()) {
- return 0;
- }
- if (available() >= n) {
- // we can skip from the internal buffers
- int toSkip = (int) n;
- // We need to skip from both active buffer and read ahead buffer
- toSkip -= activeBuffer.remaining();
- assert(toSkip > 0); // skipping from activeBuffer already handled.
- activeBuffer.position(0);
- activeBuffer.flip();
- readAheadBuffer.position(toSkip + readAheadBuffer.position());
- swapBuffers();
- // Trigger async read to emptied read ahead buffer.
- readAsync();
- return n;
- } else {
- int skippedBytes = available();
- long toSkip = n - skippedBytes;
- activeBuffer.position(0);
- activeBuffer.flip();
- readAheadBuffer.position(0);
- readAheadBuffer.flip();
- long skippedFromInputStream = underlyingInputStream.skip(toSkip);
- readAsync();
- return skippedBytes + skippedFromInputStream;
- }
- }
-
- @Override
- public void close() throws IOException {
- boolean isSafeToCloseUnderlyingInputStream = false;
- stateChangeLock.lock();
- try {
- if (isClosed) {
- return;
- }
- isClosed = true;
- if (!isReading) {
- // Nobody is reading, so we can close the underlying input stream in this method.
- isSafeToCloseUnderlyingInputStream = true;
- // Flip this to make sure the read ahead task will not close the underlying input stream.
- isUnderlyingInputStreamBeingClosed = true;
- }
- } finally {
- stateChangeLock.unlock();
- }
-
- try {
- executorService.shutdownNow();
- executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- InterruptedIOException iio = new InterruptedIOException(e.getMessage());
- iio.initCause(e);
- throw iio;
- } finally {
- if (isSafeToCloseUnderlyingInputStream) {
- underlyingInputStream.close();
- }
- }
- }
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 836bb4db5..fd8877806 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -11,7 +11,6 @@ import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerProto;
-import seaweedfs.client.ReadAheadInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -77,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024));
+ return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 836bb4db5..fd8877806 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -11,7 +11,6 @@ import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerProto;
-import seaweedfs.client.ReadAheadInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -77,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024));
+ return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;