diff options
Diffstat (limited to 'other/java/client/src')
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java | 8 | ||||
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java | 58 |
2 files changed, 61 insertions, 5 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 519ff0fd9..8b26c242c 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -28,11 +28,13 @@ public class SeaweedInputStream extends InputStream { public SeaweedInputStream( final FilerGrpcClient filerGrpcClient, - final String dir, final String name) throws IOException { + final String fullpath) throws IOException { this.filerGrpcClient = filerGrpcClient; - this.path = dir; + this.path = fullpath; FilerClient filerClient = new FilerClient(filerGrpcClient); - this.entry = filerClient.lookupEntry(dir, name); + this.entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(fullpath), + SeaweedOutputStream.getFileName(fullpath)); this.contentLength = SeaweedRead.fileSize(entry); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index b09a15a5c..a98bbd1ab 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - + protected final boolean supportFlush = false; // true; private final FilerGrpcClient filerGrpcClient; private final String path; private final int bufferSize; @@ -22,7 +22,6 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; private final FilerProto.Entry.Builder entry; - protected final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private long position; private boolean closed; @@ -32,6 +31,45 @@ public class SeaweedOutputStream extends OutputStream { private ByteBuffer buffer; private long outputIndex; private String replication = "000"; + private boolean shouldSaveMetadata = false; + + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { + this.filerGrpcClient = filerGrpcClient; + this.path = fullpath; + this.position = 0; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = 8 * 1024 * 1024; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + this.shouldSaveMetadata = true; + + } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { @@ -66,9 +104,20 @@ public class SeaweedOutputStream extends OutputStream { return path; } int lastSlashIndex = path.lastIndexOf("/"); + if (lastSlashIndex == 0) { + return "/"; + } return path.substring(0, lastSlashIndex); } + public static String getFileName(String path) { + if (path.indexOf("/") < 0) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + return path.substring(lastSlashIndex+1); + } + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); @@ -163,6 +212,11 @@ public class SeaweedOutputStream extends OutputStream { threadExecutor.shutdownNow(); } } + + if (shouldSaveMetadata) { + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + } + } private synchronized void writeCurrentBufferToService() throws IOException { |
