From bc3be0bb3756c559ba3495929f40982dba679fd5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 15 Jul 2020 13:25:44 -0700 Subject: Hadoop: 1.3.3 improve memory efficiency --- .../java/seaweed/hdfs/SeaweedOutputStream.java | 39 ++++++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) (limited to 'other/java/hdfs2/src') diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index e08843caa..1100e9a53 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -14,6 +14,7 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.util.Arrays; import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -28,16 +29,16 @@ public class SeaweedOutputStream extends OutputStream { private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; - private FilerProto.Entry.Builder entry; + private final FilerProto.Entry.Builder entry; + private final boolean supportFlush = true; + private final ConcurrentLinkedDeque writeOperations; private long position; private boolean closed; - private boolean supportFlush = true; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; private byte[] buffer; private int bufferIndex; - private ConcurrentLinkedDeque writeOperations; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -50,18 +51,18 @@ public class SeaweedOutputStream extends OutputStream { this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; + // this.buffer = new byte[bufferSize]; this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue()); + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; @@ -84,7 +85,7 @@ public class SeaweedOutputStream extends OutputStream { @Override public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { + throws IOException { maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -98,6 +99,22 @@ public class SeaweedOutputStream extends OutputStream { int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { + + if (buffer == null) { + buffer = new byte[32]; + } + // ensureCapacity + if (numberOfBytesToWrite > buffer.length - bufferIndex) { + int capacity = buffer.length; + while(capacity-bufferIndex