diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-02-04 20:30:49 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-02-04 20:30:49 -0800 |
| commit | 9fa7977714d99e546cee32e02b5f7fdf3528078b (patch) | |
| tree | a2c24d1577800fa4792a7ed8417381d8769b629b | |
| parent | ce416d765f42f673e84723703f75c8d0ab151889 (diff) | |
| download | seaweedfs-9fa7977714d99e546cee32e02b5f7fdf3528078b.tar.xz seaweedfs-9fa7977714d99e546cee32e02b5f7fdf3528078b.zip | |
Java: SeaweedOutputStream refactoring
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java | 61 |
1 files changed, 18 insertions, 43 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 94f34b221..f9df22c9b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -21,8 +21,9 @@ public class SeaweedOutputStream extends OutputStream { private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private final FilerProto.Entry.Builder entry; private final ConcurrentLinkedDeque<WriteOperation> writeOperations; + private final boolean shouldSaveMetadata = false; + private FilerProto.Entry.Builder entry; private long position; private boolean closed; private volatile IOException lastError; @@ -31,49 +32,13 @@ public class SeaweedOutputStream extends OutputStream { private ByteBuffer buffer; private long outputIndex; private String replication = "000"; - private boolean shouldSaveMetadata = false; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { this(filerGrpcClient, fullpath, "000"); } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { - this.replication = replication; - this.filerGrpcClient = filerGrpcClient; - this.path = fullpath; - this.position = 0; - this.closed = false; - this.lastError = null; - this.lastFlushOffset = 0; - this.bufferSize = 8 * 1024 * 1024; - this.buffer = ByteBufferPool.request(bufferSize); - this.outputIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); - - this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); - - long now = System.currentTimeMillis() / 1000L; - - this.entry = FilerProto.Entry.newBuilder() - .setName(getFileName(path)) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setFileMode(0755) - .setReplication(replication) - .setCrtime(now) - .setMtime(now) - .clearGroupName() - ); - this.shouldSaveMetadata = true; - + this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, @@ -101,6 +66,20 @@ public class SeaweedOutputStream extends OutputStream { this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; + if (this.entry == null) { + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + } } @@ -120,7 +99,7 @@ public class SeaweedOutputStream extends OutputStream { return path; } int lastSlashIndex = path.lastIndexOf("/"); - return path.substring(lastSlashIndex+1); + return path.substring(lastSlashIndex + 1); } private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { @@ -218,10 +197,6 @@ public class SeaweedOutputStream extends OutputStream { } } - if (shouldSaveMetadata) { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); - } - } private synchronized void writeCurrentBufferToService() throws IOException { |
