aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-30 05:39:09 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-30 05:39:09 -0800
commit043c2d796098e1f88da1442eefdb9bd5df075708 (patch)
tree23a182f3279502d9f0de8a1bdd09633d00bff45d
parent678b9a60be690c6d4c8fb03c0a1bdd7516eaed8a (diff)
downloadseaweedfs-043c2d796098e1f88da1442eefdb9bd5df075708.tar.xz
seaweedfs-043c2d796098e1f88da1442eefdb9bd5df075708.zip
refactoring SeaweedOutputStream
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java (renamed from other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java)41
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java16
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java64
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java337
6 files changed, 101 insertions, 361 deletions
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index 26290c46c..b09a15a5c 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -1,16 +1,9 @@
-package seaweed.hdfs;
+package seaweedfs.client;
// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.ByteBufferPool;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -18,20 +11,18 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.*;
-import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
-
public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
private final FilerGrpcClient filerGrpcClient;
- private final Path path;
+ private final String path;
private final int bufferSize;
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry;
- private final boolean supportFlush = false; // true;
+ protected final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
@@ -42,7 +33,7 @@ public class SeaweedOutputStream extends OutputStream {
private long outputIndex;
private String replication = "000";
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
+ public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
this.filerGrpcClient = filerGrpcClient;
this.replication = replication;
@@ -70,6 +61,14 @@ public class SeaweedOutputStream extends OutputStream {
}
+ public static String getParentDirectory(String path) {
+ if (path.equals("/")) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ return path.substring(0, lastSlashIndex);
+ }
+
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
@@ -89,7 +88,9 @@ public class SeaweedOutputStream extends OutputStream {
throws IOException {
maybeThrowLastError();
- Preconditions.checkArgument(data != null, "null data");
+ if (data == null) {
+ return;
+ }
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
@@ -152,7 +153,7 @@ public class SeaweedOutputStream extends OutputStream {
flushInternal();
threadExecutor.shutdown();
} finally {
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ lastError = new IOException("Stream is closed!");
ByteBufferPool.release(buffer);
buffer = null;
outputIndex = 0;
@@ -185,7 +186,7 @@ public class SeaweedOutputStream extends OutputStream {
}
final Future<Void> job = completionService.submit(() -> {
// System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath());
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
// System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite);
return null;
@@ -239,13 +240,13 @@ public class SeaweedOutputStream extends OutputStream {
}
}
- private synchronized void flushInternal() throws IOException {
+ protected synchronized void flushInternal() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToService();
}
- private synchronized void flushInternalAsync() throws IOException {
+ protected synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToServiceAsync();
@@ -278,10 +279,6 @@ public class SeaweedOutputStream extends OutputStream {
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkArgument(startOffset >= 0, "startOffset");
- Preconditions.checkArgument(length >= 0, "length");
-
this.task = task;
this.startOffset = startOffset;
this.length = length;
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 14b32528e..719e52579 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -216,7 +216,7 @@ public class SeaweedFileSystemStore {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
}
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
+ return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication);
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
new file mode 100644
index 000000000..f7a6225d8
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -0,0 +1,16 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedOutputStream;
+
+public class SeaweedHadoopOutputStream extends SeaweedOutputStream {
+
+ public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ super(filerGrpcClient, path.toString(), entry, position, bufferSize, replication);
+ }
+
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 14b32528e..719e52579 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -216,7 +216,7 @@ public class SeaweedFileSystemStore {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
}
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
+ return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication);
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
new file mode 100644
index 000000000..f65aef619
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -0,0 +1,64 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedOutputStream;
+
+import java.io.IOException;
+import java.util.Locale;
+
+public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities {
+
+ public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ super(filerGrpcClient, path, entry, position, bufferSize, replication);
+ }
+
+ /**
+ * Similar to posix fsync, flush out the data in client's user buffer
+ * all the way to the disk device (but the disk may have it in its cache).
+ *
+ * @throws IOException if error occurs
+ */
+ @Override
+ public void hsync() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Flush out the data in client's user buffer. After the return of
+ * this call, new readers will see the data.
+ *
+ * @throws IOException if any error occurs
+ */
+ @Override
+ public void hflush() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Query the stream for a specific capability.
+ *
+ * @param capability string to query the stream support for.
+ * @return true for hsync and hflush.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (capability.toLowerCase(Locale.ENGLISH)) {
+ case StreamCapabilities.HSYNC:
+ case StreamCapabilities.HFLUSH:
+ return supportFlush;
+ default:
+ return false;
+ }
+ }
+
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
deleted file mode 100644
index d4c967a06..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ /dev/null
@@ -1,337 +0,0 @@
-package seaweed.hdfs;
-
-// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StreamCapabilities;
-import org.apache.hadoop.fs.Syncable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.ByteBufferPool;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedWrite;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Locale;
-import java.util.concurrent.*;
-
-import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
-
-public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Path path;
- private final int bufferSize;
- private final int maxConcurrentRequestCount;
- private final ThreadPoolExecutor threadExecutor;
- private final ExecutorCompletionService<Void> completionService;
- private final FilerProto.Entry.Builder entry;
- private final boolean supportFlush = false; // true;
- private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
- private long position;
- private boolean closed;
- private volatile IOException lastError;
- private long lastFlushOffset;
- private long lastTotalAppendOffset = 0;
- private ByteBuffer buffer;
- private long outputIndex;
- private String replication = "000";
-
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
- this.filerGrpcClient = filerGrpcClient;
- this.replication = replication;
- this.path = path;
- this.position = position;
- this.closed = false;
- this.lastError = null;
- this.lastFlushOffset = 0;
- this.bufferSize = bufferSize;
- this.buffer = ByteBufferPool.request(bufferSize);
- this.outputIndex = 0;
- this.writeOperations = new ConcurrentLinkedDeque<>();
-
- this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
-
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 120L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
-
- this.entry = entry;
-
- }
-
- private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
- try {
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
- } catch (Exception ex) {
- throw new IOException(ex);
- }
- this.lastFlushOffset = offset;
- }
-
- @Override
- public void write(final int byteVal) throws IOException {
- write(new byte[]{(byte) (byteVal & 0xFF)});
- }
-
- @Override
- public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
- maybeThrowLastError();
-
- Preconditions.checkArgument(data != null, "null data");
-
- if (off < 0 || length < 0 || length > data.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
-
- int currentOffset = off;
- int writableBytes = bufferSize - buffer.position();
- int numberOfBytesToWrite = length;
-
- while (numberOfBytesToWrite > 0) {
-
- if (numberOfBytesToWrite < writableBytes) {
- buffer.put(data, currentOffset, numberOfBytesToWrite);
- outputIndex += numberOfBytesToWrite;
- break;
- }
-
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
- buffer.put(data, currentOffset, writableBytes);
- outputIndex += writableBytes;
- currentOffset += writableBytes;
- writeCurrentBufferToService();
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- writableBytes = bufferSize - buffer.position();
- }
-
- }
-
- /**
- * Flushes this output stream and forces any buffered output bytes to be
- * written out. If any data remains in the payload it is committed to the
- * service. Data is queued for writing and forced out to the service
- * before the call returns.
- */
- @Override
- public void flush() throws IOException {
- if (supportFlush) {
- flushInternalAsync();
- }
- }
-
- /**
- * Similar to posix fsync, flush out the data in client's user buffer
- * all the way to the disk device (but the disk may have it in its cache).
- *
- * @throws IOException if error occurs
- */
- @Override
- public void hsync() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Flush out the data in client's user buffer. After the return of
- * this call, new readers will see the data.
- *
- * @throws IOException if any error occurs
- */
- @Override
- public void hflush() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Query the stream for a specific capability.
- *
- * @param capability string to query the stream support for.
- * @return true for hsync and hflush.
- */
- @Override
- public boolean hasCapability(String capability) {
- switch (capability.toLowerCase(Locale.ENGLISH)) {
- case StreamCapabilities.HSYNC:
- case StreamCapabilities.HFLUSH:
- return supportFlush;
- default:
- return false;
- }
- }
-
- /**
- * Force all data in the output stream to be written to Azure storage.
- * Wait to return until this is complete. Close the access to the stream and
- * shutdown the upload thread pool.
- * If the blob was created, its lease will be released.
- * Any error encountered caught in threads and stored will be rethrown here
- * after cleanup.
- */
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
-
- LOG.debug("close path: {}", path);
- try {
- flushInternal();
- threadExecutor.shutdown();
- } finally {
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- ByteBufferPool.release(buffer);
- buffer = null;
- outputIndex = 0;
- closed = true;
- writeOperations.clear();
- if (!threadExecutor.isShutdown()) {
- threadExecutor.shutdownNow();
- }
- }
- }
-
- private synchronized void writeCurrentBufferToService() throws IOException {
- if (buffer.position() == 0) {
- return;
- }
-
- position += submitWriteBufferToService(buffer, position);
-
- buffer = ByteBufferPool.request(bufferSize);
-
- }
-
- private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
-
- bufferToWrite.flip();
- int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
-
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
- waitForTaskToComplete();
- }
- final Future<Void> job = completionService.submit(() -> {
- // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath());
- // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- ByteBufferPool.release(bufferToWrite);
- return null;
- });
-
- writeOperations.add(new WriteOperation(job, writePosition, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
-
- return bytesLength;
-
- }
-
- private void waitForTaskToComplete() throws IOException {
- boolean completed;
- for (completed = false; completionService.poll() != null; completed = true) {
- // keep polling until there is no data
- }
-
- if (!completed) {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
- throw lastError;
- }
- }
- }
-
- private void maybeThrowLastError() throws IOException {
- if (lastError != null) {
- throw lastError;
- }
- }
-
- /**
- * Try to remove the completed write operations from the beginning of write
- * operation FIFO queue.
- */
- private synchronized void shrinkWriteOperationQueue() throws IOException {
- try {
- while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
- writeOperations.peek().task.get();
- lastTotalAppendOffset += writeOperations.peek().length;
- writeOperations.remove();
- }
- } catch (Exception e) {
- lastError = new IOException(e);
- throw lastError;
- }
- }
-
- private synchronized void flushInternal() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToService();
- }
-
- private synchronized void flushInternalAsync() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToServiceAsync();
- }
-
- private synchronized void flushWrittenBytesToService() throws IOException {
- for (WriteOperation writeOperation : writeOperations) {
- try {
- writeOperation.task.get();
- } catch (Exception ex) {
- lastError = new IOException(ex);
- throw lastError;
- }
- }
- LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
- flushWrittenBytesToServiceInternal(position);
- }
-
- private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
- shrinkWriteOperationQueue();
-
- if (this.lastTotalAppendOffset > this.lastFlushOffset) {
- this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
- }
- }
-
- private static class WriteOperation {
- private final Future<Void> task;
- private final long startOffset;
- private final long length;
-
- WriteOperation(final Future<Void> task, final long startOffset, final long length) {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkArgument(startOffset >= 0, "startOffset");
- Preconditions.checkArgument(length >= 0, "length");
-
- this.task = task;
- this.startOffset = startOffset;
- this.length = length;
- }
- }
-
-}