aboutsummaryrefslogtreecommitdiff
path: root/other/java/hdfs2
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-07-15 23:33:31 -0700
committerChris Lu <chris.lu@gmail.com>2020-07-15 23:33:31 -0700
commit7bca72deedb872402b4e597976eb198f3dad2d74 (patch)
treeec8246e075538b1cae71f997a761f84a6d511c86 /other/java/hdfs2
parent2286d27730ba0fd38654130ae8bb02181e3b45fb (diff)
downloadseaweedfs-7bca72deedb872402b4e597976eb198f3dad2d74.tar.xz
seaweedfs-7bca72deedb872402b4e597976eb198f3dad2d74.zip
reuse bytebuffer
Diffstat (limited to 'other/java/hdfs2')
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs2/pom.xml2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java88
3 files changed, 32 insertions, 60 deletions
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index 64de808c4..06bfda480 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.3.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.5</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index 8bbd31f29..24699b99a 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.3.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.5</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 1100e9a53..1138ecca2 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
@@ -14,7 +15,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@@ -30,15 +31,15 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry;
- private final boolean supportFlush = true;
+ private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
+ private ByteBuffer buffer;
+ private long outputIndex;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@@ -51,8 +52,8 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- // this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
@@ -94,41 +95,29 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException();
}
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
+ int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
- if (buffer == null) {
- buffer = new byte[32];
- }
- // ensureCapacity
- if (numberOfBytesToWrite > buffer.length - bufferIndex) {
- int capacity = buffer.length;
- while(capacity-bufferIndex<numberOfBytesToWrite){
- capacity = capacity << 1;
- }
- if (capacity < 0) {
- throw new OutOfMemoryError();
- }
- buffer = Arrays.copyOf(buffer, capacity);
- }
-
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
+ if (numberOfBytesToWrite < writableBytes) {
+ buffer.put(data, currentOffset, numberOfBytesToWrite);
+ outputIndex += numberOfBytesToWrite;
+ break;
}
- writableBytes = bufferSize - bufferIndex;
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ buffer.put(data, currentOffset, writableBytes);
+ outputIndex += writableBytes;
+ currentOffset += writableBytes;
+ writeCurrentBufferToService();
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ writableBytes = bufferSize - buffer.position();
}
+
}
/**
@@ -164,8 +153,9 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ ByteBufferPool.release(buffer);
buffer = null;
- bufferIndex = 0;
+ outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
@@ -175,35 +165,17 @@ public class SeaweedOutputStream extends OutputStream {
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
+ if (buffer.position() == 0) {
return;
}
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
-
- buffer = null; // new byte[bufferSize];
- bufferIndex = 0;
- final long offset = position;
+ buffer.flip();
+ int bytesLength = buffer.limit() - buffer.position();
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
+ // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
position += bytesLength;
+ buffer.clear();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- waitForTaskToComplete();
- }
-
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
- });
-
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {