aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-04 20:30:49 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-04 20:30:49 -0800
commit9fa7977714d99e546cee32e02b5f7fdf3528078b (patch)
treea2c24d1577800fa4792a7ed8417381d8769b629b
parentce416d765f42f673e84723703f75c8d0ab151889 (diff)
downloadseaweedfs-9fa7977714d99e546cee32e02b5f7fdf3528078b.tar.xz
seaweedfs-9fa7977714d99e546cee32e02b5f7fdf3528078b.zip
Java: SeaweedOutputStream refactoring
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java61
1 files changed, 18 insertions, 43 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index 94f34b221..f9df22c9b 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -21,8 +21,9 @@ public class SeaweedOutputStream extends OutputStream {
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
- private final FilerProto.Entry.Builder entry;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private final boolean shouldSaveMetadata = false;
+ private FilerProto.Entry.Builder entry;
private long position;
private boolean closed;
private volatile IOException lastError;
@@ -31,49 +32,13 @@ public class SeaweedOutputStream extends OutputStream {
private ByteBuffer buffer;
private long outputIndex;
private String replication = "000";
- private boolean shouldSaveMetadata = false;
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) {
this(filerGrpcClient, fullpath, "000");
}
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) {
- this.replication = replication;
- this.filerGrpcClient = filerGrpcClient;
- this.path = fullpath;
- this.position = 0;
- this.closed = false;
- this.lastError = null;
- this.lastFlushOffset = 0;
- this.bufferSize = 8 * 1024 * 1024;
- this.buffer = ByteBufferPool.request(bufferSize);
- this.outputIndex = 0;
- this.writeOperations = new ConcurrentLinkedDeque<>();
-
- this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
-
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 120L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
-
- long now = System.currentTimeMillis() / 1000L;
-
- this.entry = FilerProto.Entry.newBuilder()
- .setName(getFileName(path))
- .setIsDirectory(false)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setFileMode(0755)
- .setReplication(replication)
- .setCrtime(now)
- .setMtime(now)
- .clearGroupName()
- );
- this.shouldSaveMetadata = true;
-
+ this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000");
}
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
@@ -101,6 +66,20 @@ public class SeaweedOutputStream extends OutputStream {
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
+ if (this.entry == null) {
+ long now = System.currentTimeMillis() / 1000L;
+
+ this.entry = FilerProto.Entry.newBuilder()
+ .setName(getFileName(path))
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(0755)
+ .setReplication(replication)
+ .setCrtime(now)
+ .setMtime(now)
+ .clearGroupName()
+ );
+ }
}
@@ -120,7 +99,7 @@ public class SeaweedOutputStream extends OutputStream {
return path;
}
int lastSlashIndex = path.lastIndexOf("/");
- return path.substring(lastSlashIndex+1);
+ return path.substring(lastSlashIndex + 1);
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
@@ -218,10 +197,6 @@ public class SeaweedOutputStream extends OutputStream {
}
}
- if (shouldSaveMetadata) {
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
- }
-
}
private synchronized void writeCurrentBufferToService() throws IOException {