aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-30 05:39:09 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-30 05:39:09 -0800
commit043c2d796098e1f88da1442eefdb9bd5df075708 (patch)
tree23a182f3279502d9f0de8a1bdd09633d00bff45d /other/java/client
parent678b9a60be690c6d4c8fb03c0a1bdd7516eaed8a (diff)
downloadseaweedfs-043c2d796098e1f88da1442eefdb9bd5df075708.tar.xz
seaweedfs-043c2d796098e1f88da1442eefdb9bd5df075708.zip
refactoring SeaweedOutputStream
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java288
1 files changed, 288 insertions, 0 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
new file mode 100644
index 000000000..b09a15a5c
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -0,0 +1,288 @@
+package seaweedfs.client;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+
+public class SeaweedOutputStream extends OutputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
+
+ private final FilerGrpcClient filerGrpcClient;
+ private final String path;
+ private final int bufferSize;
+ private final int maxConcurrentRequestCount;
+ private final ThreadPoolExecutor threadExecutor;
+ private final ExecutorCompletionService<Void> completionService;
+ private final FilerProto.Entry.Builder entry;
+ protected final boolean supportFlush = false; // true;
+ private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private long position;
+ private boolean closed;
+ private volatile IOException lastError;
+ private long lastFlushOffset;
+ private long lastTotalAppendOffset = 0;
+ private ByteBuffer buffer;
+ private long outputIndex;
+ private String replication = "000";
+
+ public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ this.filerGrpcClient = filerGrpcClient;
+ this.replication = replication;
+ this.path = path;
+ this.position = position;
+ this.closed = false;
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = bufferSize;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+
+ this.entry = entry;
+
+ }
+
+ public static String getParentDirectory(String path) {
+ if (path.equals("/")) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ return path.substring(0, lastSlashIndex);
+ }
+
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
+ try {
+ SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ this.lastFlushOffset = offset;
+ }
+
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[]{(byte) (byteVal & 0xFF)});
+ }
+
+ @Override
+ public synchronized void write(final byte[] data, final int off, final int length)
+ throws IOException {
+ maybeThrowLastError();
+
+ if (data == null) {
+ return;
+ }
+
+ if (off < 0 || length < 0 || length > data.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
+ int currentOffset = off;
+ int writableBytes = bufferSize - buffer.position();
+ int numberOfBytesToWrite = length;
+
+ while (numberOfBytesToWrite > 0) {
+
+ if (numberOfBytesToWrite < writableBytes) {
+ buffer.put(data, currentOffset, numberOfBytesToWrite);
+ outputIndex += numberOfBytesToWrite;
+ break;
+ }
+
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
+ buffer.put(data, currentOffset, writableBytes);
+ outputIndex += writableBytes;
+ currentOffset += writableBytes;
+ writeCurrentBufferToService();
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ writableBytes = bufferSize - buffer.position();
+ }
+
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the payload it is committed to the
+ * service. Data is queued for writing and forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
+ if (supportFlush) {
+ flushInternalAsync();
+ }
+ }
+
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete. Close the access to the stream and
+ * shutdown the upload thread pool.
+ * If the blob was created, its lease will be released.
+ * Any error encountered caught in threads and stored will be rethrown here
+ * after cleanup.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ LOG.debug("close path: {}", path);
+ try {
+ flushInternal();
+ threadExecutor.shutdown();
+ } finally {
+ lastError = new IOException("Stream is closed!");
+ ByteBufferPool.release(buffer);
+ buffer = null;
+ outputIndex = 0;
+ closed = true;
+ writeOperations.clear();
+ if (!threadExecutor.isShutdown()) {
+ threadExecutor.shutdownNow();
+ }
+ }
+ }
+
+ private synchronized void writeCurrentBufferToService() throws IOException {
+ if (buffer.position() == 0) {
+ return;
+ }
+
+ position += submitWriteBufferToService(buffer, position);
+
+ buffer = ByteBufferPool.request(bufferSize);
+
+ }
+
+ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
+
+ bufferToWrite.flip();
+ int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
+
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
+ waitForTaskToComplete();
+ }
+ final Future<Void> job = completionService.submit(() -> {
+ // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
+ // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ ByteBufferPool.release(bufferToWrite);
+ return null;
+ });
+
+ writeOperations.add(new WriteOperation(job, writePosition, bytesLength));
+
+ // Try to shrink the queue
+ shrinkWriteOperationQueue();
+
+ return bytesLength;
+
+ }
+
+ private void waitForTaskToComplete() throws IOException {
+ boolean completed;
+ for (completed = false; completionService.poll() != null; completed = true) {
+ // keep polling until there is no data
+ }
+
+ if (!completed) {
+ try {
+ completionService.take();
+ } catch (InterruptedException e) {
+ lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
+ throw lastError;
+ }
+ }
+ }
+
+ private void maybeThrowLastError() throws IOException {
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Try to remove the completed write operations from the beginning of write
+ * operation FIFO queue.
+ */
+ private synchronized void shrinkWriteOperationQueue() throws IOException {
+ try {
+ while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
+ writeOperations.peek().task.get();
+ lastTotalAppendOffset += writeOperations.peek().length;
+ writeOperations.remove();
+ }
+ } catch (Exception e) {
+ lastError = new IOException(e);
+ throw lastError;
+ }
+ }
+
+ protected synchronized void flushInternal() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToService();
+ }
+
+ protected synchronized void flushInternalAsync() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToServiceAsync();
+ }
+
+ private synchronized void flushWrittenBytesToService() throws IOException {
+ for (WriteOperation writeOperation : writeOperations) {
+ try {
+ writeOperation.task.get();
+ } catch (Exception ex) {
+ lastError = new IOException(ex);
+ throw lastError;
+ }
+ }
+ LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
+ flushWrittenBytesToServiceInternal(position);
+ }
+
+ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
+ shrinkWriteOperationQueue();
+
+ if (this.lastTotalAppendOffset > this.lastFlushOffset) {
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
+ }
+ }
+
+ private static class WriteOperation {
+ private final Future<Void> task;
+ private final long startOffset;
+ private final long length;
+
+ WriteOperation(final Future<Void> task, final long startOffset, final long length) {
+ this.task = task;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+ }
+
+}