aboutsummaryrefslogtreecommitdiff
path: root/other
diff options
context:
space:
mode:
Diffstat (limited to 'other')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java14
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java6
2 files changed, 13 insertions, 7 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index d5c3399ed..68c281992 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -33,6 +33,7 @@ public class SeaweedOutputStream extends OutputStream {
private ByteBuffer buffer;
private long outputIndex;
private String replication = "";
+ private String collection = "";
public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
this(filerClient, fullpath, "");
@@ -53,7 +54,6 @@ public class SeaweedOutputStream extends OutputStream {
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
this.buffer = ByteBufferPool.request(bufferSize);
- this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
@@ -83,6 +83,13 @@ public class SeaweedOutputStream extends OutputStream {
}
+ public void setReplication(String replication) {
+ this.replication = replication;
+ }
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+
public static String getParentDirectory(String path) {
int protoIndex = path.indexOf("://");
if (protoIndex >= 0) {
@@ -144,13 +151,11 @@ public class SeaweedOutputStream extends OutputStream {
if (numberOfBytesToWrite < writableBytes) {
buffer.put(data, currentOffset, numberOfBytesToWrite);
- outputIndex += numberOfBytesToWrite;
break;
}
// System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
- outputIndex += writableBytes;
currentOffset += writableBytes;
writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
@@ -194,7 +199,6 @@ public class SeaweedOutputStream extends OutputStream {
lastError = new IOException("Stream is closed!");
ByteBufferPool.release(buffer);
buffer = null;
- outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
@@ -225,7 +229,7 @@ public class SeaweedOutputStream extends OutputStream {
}
final Future<Void> job = completionService.submit(() -> {
// System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
+ SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
// System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite);
return null;
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index f477303c9..88c7cefbe 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -26,6 +26,7 @@ public class SeaweedWrite {
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
+ String collection,
final FilerClient filerClient,
final long offset,
final byte[] bytes,
@@ -36,7 +37,7 @@ public class SeaweedWrite {
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
try {
FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
- replication, filerClient, offset, bytes, bytesOffset, bytesLength, path);
+ replication, collection, filerClient, offset, bytes, bytesOffset, bytesLength, path);
lastException = null;
synchronized (entry) {
entry.addChunks(chunkBuilder);
@@ -59,6 +60,7 @@ public class SeaweedWrite {
}
public static FilerProto.FileChunk.Builder writeChunk(final String replication,
+ final String collection,
final FilerClient filerClient,
final long offset,
final byte[] bytes,
@@ -67,7 +69,7 @@ public class SeaweedWrite {
final String path) throws IOException {
FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection(filerClient.getCollection())
+ .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
.setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
.setDataCenter("")
.setTtlSec(0)