diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-07-15 13:25:44 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-07-15 13:25:44 -0700 |
| commit | bc3be0bb3756c559ba3495929f40982dba679fd5 (patch) | |
| tree | 7732e7aa626230bd48d00540cfed7763bc2d5b8c | |
| parent | 316e853e0e7f48e4330f452fe55e0349ad2c9a64 (diff) | |
| download | seaweedfs-bc3be0bb3756c559ba3495929f40982dba679fd5.tar.xz seaweedfs-bc3be0bb3756c559ba3495929f40982dba679fd5.zip | |
Hadoop: 1.3.3
improve memory efficiency
| -rw-r--r-- | other/java/client/pom.xml | 2 | ||||
| -rw-r--r-- | other/java/client/pom.xml.deploy | 2 | ||||
| -rw-r--r-- | other/java/client/pom_debug.xml | 2 | ||||
| -rw-r--r-- | other/java/hdfs2/dependency-reduced-pom.xml | 2 | ||||
| -rw-r--r-- | other/java/hdfs2/pom.xml | 2 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java | 39 | ||||
| -rw-r--r-- | other/java/hdfs3/dependency-reduced-pom.xml | 2 | ||||
| -rw-r--r-- | other/java/hdfs3/pom.xml | 2 | ||||
| -rw-r--r-- | other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java | 48 |
9 files changed, 64 insertions, 37 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 3d4a1aff4..d35415a9d 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.3.2</version> + <version>1.3.3</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 3d4a1aff4..d35415a9d 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.3.2</version> + <version>1.3.3</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 820eebbf2..1a8638fa8 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.3.2</version> + <version>1.3.3</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 1bd6289d3..61c00463f 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.3.2</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 5f6b375be..b67ce1efe 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.3.2</seaweedfs.client.version> + <seaweedfs.client.version>1.3.3</seaweedfs.client.version> <hadoop.version>2.9.2</hadoop.version> </properties> diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index e08843caa..1100e9a53 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -14,6 +14,7 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.util.Arrays; import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -28,16 +29,16 @@ public class SeaweedOutputStream extends OutputStream { private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; + private final FilerProto.Entry.Builder entry; + private final boolean supportFlush = true; + private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private long position; private boolean closed; - private boolean supportFlush = true; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; private byte[] buffer; private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -50,18 +51,18 @@ public class SeaweedOutputStream extends OutputStream { this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; + // this.buffer = new byte[bufferSize]; this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; @@ -84,7 +85,7 @@ public class SeaweedOutputStream extends OutputStream { @Override public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { + throws IOException { maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -98,6 +99,22 @@ public class SeaweedOutputStream extends OutputStream { int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { + + if (buffer == null) { + buffer = new byte[32]; + } + // ensureCapacity + if (numberOfBytesToWrite > buffer.length - bufferIndex) { + int capacity = buffer.length; + while(capacity-bufferIndex<numberOfBytesToWrite){ + capacity = capacity << 1; + } + if (capacity < 0) { + throw new OutOfMemoryError(); + } + buffer = Arrays.copyOf(buffer, capacity); + } + if (writableBytes <= numberOfBytesToWrite) { System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); bufferIndex += writableBytes; @@ -165,7 +182,7 @@ public class SeaweedOutputStream extends OutputStream { final byte[] bytes = buffer; final int bytesLength = bufferIndex; - buffer = new byte[bufferSize]; + buffer = null; // new byte[bufferSize]; bufferIndex = 0; final long offset = position; position += bytesLength; diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index bb897e4b7..77f4688f8 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.3.2</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 780c84e17..cef029c2d 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.3.2</seaweedfs.client.version> + <seaweedfs.client.version>1.3.3</seaweedfs.client.version> <hadoop.version>3.1.1</hadoop.version> </properties> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 96af27fe0..0dd094b55 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -16,14 +16,8 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.util.Locale; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -37,16 +31,16 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; + private final FilerProto.Entry.Builder entry; + private final boolean supportFlush = true; + private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private long position; private boolean closed; - private boolean supportFlush = true; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; private byte[] buffer; private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -59,18 +53,18 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; + // this.buffer = new byte[bufferSize]; this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; @@ -93,7 +87,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea @Override public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { + throws IOException { maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -107,6 +101,22 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { + + if (buffer == null) { + buffer = new byte[32]; + } + // ensureCapacity + if (numberOfBytesToWrite > buffer.length - bufferIndex) { + int capacity = buffer.length; + while(capacity-bufferIndex<numberOfBytesToWrite){ + capacity = capacity << 1; + } + if (capacity < 0) { + throw new OutOfMemoryError(); + } + buffer = Arrays.copyOf(buffer, capacity); + } + if (writableBytes <= numberOfBytesToWrite) { System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); bufferIndex += writableBytes; @@ -217,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea final byte[] bytes = buffer; final int bytesLength = bufferIndex; - buffer = new byte[bufferSize]; + buffer = null; // new byte[bufferSize]; bufferIndex = 0; final long offset = position; position += bytesLength; |
