aboutsummaryrefslogtreecommitdiff
path: root/other/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client/src')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java8
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java58
2 files changed, 61 insertions, 5 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
index 519ff0fd9..8b26c242c 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -28,11 +28,13 @@ public class SeaweedInputStream extends InputStream {
public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient,
- final String dir, final String name) throws IOException {
+ final String fullpath) throws IOException {
this.filerGrpcClient = filerGrpcClient;
- this.path = dir;
+ this.path = fullpath;
FilerClient filerClient = new FilerClient(filerGrpcClient);
- this.entry = filerClient.lookupEntry(dir, name);
+ this.entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(fullpath),
+ SeaweedOutputStream.getFileName(fullpath));
this.contentLength = SeaweedRead.fileSize(entry);
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index b09a15a5c..a98bbd1ab 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -14,7 +14,7 @@ import java.util.concurrent.*;
public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
+ protected final boolean supportFlush = false; // true;
private final FilerGrpcClient filerGrpcClient;
private final String path;
private final int bufferSize;
@@ -22,7 +22,6 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry;
- protected final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
@@ -32,6 +31,45 @@ public class SeaweedOutputStream extends OutputStream {
private ByteBuffer buffer;
private long outputIndex;
private String replication = "000";
+ private boolean shouldSaveMetadata = false;
+
+ public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) {
+ this.filerGrpcClient = filerGrpcClient;
+ this.path = fullpath;
+ this.position = 0;
+ this.closed = false;
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = 8 * 1024 * 1024;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+
+ long now = System.currentTimeMillis() / 1000L;
+
+ this.entry = FilerProto.Entry.newBuilder()
+ .setName(getFileName(path))
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(0755)
+ .setReplication(replication)
+ .setCrtime(now)
+ .setMtime(now)
+ .clearGroupName()
+ );
+ this.shouldSaveMetadata = true;
+
+ }
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
@@ -66,9 +104,20 @@ public class SeaweedOutputStream extends OutputStream {
return path;
}
int lastSlashIndex = path.lastIndexOf("/");
+ if (lastSlashIndex == 0) {
+ return "/";
+ }
return path.substring(0, lastSlashIndex);
}
+ public static String getFileName(String path) {
+ if (path.indexOf("/") < 0) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ return path.substring(lastSlashIndex+1);
+ }
+
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
@@ -163,6 +212,11 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdownNow();
}
}
+
+ if (shouldSaveMetadata) {
+ SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ }
+
}
private synchronized void writeCurrentBufferToService() throws IOException {