aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-07-15 13:25:44 -0700
committerChris Lu <chris.lu@gmail.com>2020-07-15 13:25:44 -0700
commitbc3be0bb3756c559ba3495929f40982dba679fd5 (patch)
tree7732e7aa626230bd48d00540cfed7763bc2d5b8c /other/java
parent316e853e0e7f48e4330f452fe55e0349ad2c9a64 (diff)
downloadseaweedfs-bc3be0bb3756c559ba3495929f40982dba679fd5.tar.xz
seaweedfs-bc3be0bb3756c559ba3495929f40982dba679fd5.zip
Hadoop: 1.3.3
improve memory efficiency
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml2
-rw-r--r--other/java/client/pom.xml.deploy2
-rw-r--r--other/java/client/pom_debug.xml2
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs2/pom.xml2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java39
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs3/pom.xml2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java48
9 files changed, 64 insertions, 37 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 3d4a1aff4..d35415a9d 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.3.2</version>
+ <version>1.3.3</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
index 3d4a1aff4..d35415a9d 100644
--- a/other/java/client/pom.xml.deploy
+++ b/other/java/client/pom.xml.deploy
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.3.2</version>
+ <version>1.3.3</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 820eebbf2..1a8638fa8 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.3.2</version>
+ <version>1.3.3</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index 1bd6289d3..61c00463f 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.3.2</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index 5f6b375be..b67ce1efe 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.3.2</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index e08843caa..1100e9a53 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -14,6 +14,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.util.Arrays;
import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@@ -28,16 +29,16 @@ public class SeaweedOutputStream extends OutputStream {
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
- private FilerProto.Entry.Builder entry;
+ private final FilerProto.Entry.Builder entry;
+ private final boolean supportFlush = true;
+ private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
- private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private byte[] buffer;
private int bufferIndex;
- private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@@ -50,18 +51,18 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
+ // this.buffer = new byte[bufferSize];
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
@@ -84,7 +85,7 @@ public class SeaweedOutputStream extends OutputStream {
@Override
public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
+ throws IOException {
maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
@@ -98,6 +99,22 @@ public class SeaweedOutputStream extends OutputStream {
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
+
+ if (buffer == null) {
+ buffer = new byte[32];
+ }
+ // ensureCapacity
+ if (numberOfBytesToWrite > buffer.length - bufferIndex) {
+ int capacity = buffer.length;
+ while(capacity-bufferIndex<numberOfBytesToWrite){
+ capacity = capacity << 1;
+ }
+ if (capacity < 0) {
+ throw new OutOfMemoryError();
+ }
+ buffer = Arrays.copyOf(buffer, capacity);
+ }
+
if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes;
@@ -165,7 +182,7 @@ public class SeaweedOutputStream extends OutputStream {
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
- buffer = new byte[bufferSize];
+ buffer = null; // new byte[bufferSize];
bufferIndex = 0;
final long offset = position;
position += bytesLength;
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index bb897e4b7..77f4688f8 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.3.2</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index 780c84e17..cef029c2d 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.3.2</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.3</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 96af27fe0..0dd094b55 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -16,14 +16,8 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.util.Locale;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@@ -37,16 +31,16 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
- private FilerProto.Entry.Builder entry;
+ private final FilerProto.Entry.Builder entry;
+ private final boolean supportFlush = true;
+ private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
- private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private byte[] buffer;
private int bufferIndex;
- private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@@ -59,18 +53,18 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
+ // this.buffer = new byte[bufferSize];
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
@@ -93,7 +87,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
@Override
public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
+ throws IOException {
maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
@@ -107,6 +101,22 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
+
+ if (buffer == null) {
+ buffer = new byte[32];
+ }
+ // ensureCapacity
+ if (numberOfBytesToWrite > buffer.length - bufferIndex) {
+ int capacity = buffer.length;
+ while(capacity-bufferIndex<numberOfBytesToWrite){
+ capacity = capacity << 1;
+ }
+ if (capacity < 0) {
+ throw new OutOfMemoryError();
+ }
+ buffer = Arrays.copyOf(buffer, capacity);
+ }
+
if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes;
@@ -217,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
- buffer = new byte[bufferSize];
+ buffer = null; // new byte[bufferSize];
bufferIndex = 0;
final long offset = position;
position += bytesLength;