From 7bca72deedb872402b4e597976eb198f3dad2d74 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 15 Jul 2020 23:33:31 -0700 Subject: reuse bytebuffer --- .../java/seaweed/hdfs/SeaweedOutputStream.java | 88 ++++++++-------------- 1 file changed, 30 insertions(+), 58 deletions(-) (limited to 'other/java/hdfs2/src') diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 1100e9a53..1138ecca2 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.ByteBufferPool; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedWrite; @@ -14,7 +15,7 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.util.Arrays; +import java.nio.ByteBuffer; import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -30,15 +31,15 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; private final FilerProto.Entry.Builder entry; - private final boolean supportFlush = true; + private final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque writeOperations; private long position; private boolean closed; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; + private ByteBuffer buffer; + private long outputIndex; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -51,8 +52,8 @@ public class SeaweedOutputStream extends OutputStream { this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - // this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); @@ -94,41 +95,29 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; + int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { - if (buffer == null) { - buffer = new byte[32]; - } - // ensureCapacity - if (numberOfBytesToWrite > buffer.length - bufferIndex) { - int capacity = buffer.length; - while(capacity-bufferIndex= maxConcurrentRequestCount * 2) { - waitForTaskToComplete(); - } - - final Future job = completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } - }); - - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); } private void waitForTaskToComplete() throws IOException { -- cgit v1.2.3