aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-04 20:16:08 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-04 20:16:08 -0800
commit7f90d14f100f9ce69b6b05f6b8f80823f4c69fdf (patch)
treef6438e29b855636594ad1add4887bf46a42bbf2f
parent502554887f58915c077462372db4e2813eac3f92 (diff)
downloadseaweedfs-7f90d14f100f9ce69b6b05f6b8f80823f4c69fdf.tar.xz
seaweedfs-7f90d14f100f9ce69b6b05f6b8f80823f4c69fdf.zip
Java: add SeaweedOutputStream example
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java8
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java58
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java2
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/WriteFile.java48
4 files changed, 110 insertions, 6 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
index 519ff0fd9..8b26c242c 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -28,11 +28,13 @@ public class SeaweedInputStream extends InputStream {
public SeaweedInputStream(
final FilerGrpcClient filerGrpcClient,
- final String dir, final String name) throws IOException {
+ final String fullpath) throws IOException {
this.filerGrpcClient = filerGrpcClient;
- this.path = dir;
+ this.path = fullpath;
FilerClient filerClient = new FilerClient(filerGrpcClient);
- this.entry = filerClient.lookupEntry(dir, name);
+ this.entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(fullpath),
+ SeaweedOutputStream.getFileName(fullpath));
this.contentLength = SeaweedRead.fileSize(entry);
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index b09a15a5c..a98bbd1ab 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -14,7 +14,7 @@ import java.util.concurrent.*;
public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
+ protected final boolean supportFlush = false; // true;
private final FilerGrpcClient filerGrpcClient;
private final String path;
private final int bufferSize;
@@ -22,7 +22,6 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry;
- protected final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
@@ -32,6 +31,45 @@ public class SeaweedOutputStream extends OutputStream {
private ByteBuffer buffer;
private long outputIndex;
private String replication = "000";
+ private boolean shouldSaveMetadata = false;
+
+ public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) {
+ this.filerGrpcClient = filerGrpcClient;
+ this.path = fullpath;
+ this.position = 0;
+ this.closed = false;
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = 8 * 1024 * 1024;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+
+ long now = System.currentTimeMillis() / 1000L;
+
+ this.entry = FilerProto.Entry.newBuilder()
+ .setName(getFileName(path))
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(0755)
+ .setReplication(replication)
+ .setCrtime(now)
+ .setMtime(now)
+ .clearGroupName()
+ );
+ this.shouldSaveMetadata = true;
+
+ }
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
@@ -66,9 +104,20 @@ public class SeaweedOutputStream extends OutputStream {
return path;
}
int lastSlashIndex = path.lastIndexOf("/");
+ if (lastSlashIndex == 0) {
+ return "/";
+ }
return path.substring(0, lastSlashIndex);
}
+ public static String getFileName(String path) {
+ if (path.indexOf("/") < 0) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ return path.substring(lastSlashIndex+1);
+ }
+
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
@@ -163,6 +212,11 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdownNow();
}
}
+
+ if (shouldSaveMetadata) {
+ SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ }
+
}
private synchronized void writeCurrentBufferToService() throws IOException {
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
index fad1471b6..12eab1a2c 100644
--- a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
@@ -23,7 +23,7 @@ public class UnzipFile {
long localProcessTime = startTime2 - startTime;
SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
- filerGrpcClient, "/", "test.zip");
+ filerGrpcClient, "/test.zip");
parseZip(seaweedInputStream);
long swProcessTime = System.currentTimeMillis() - startTime2;
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/WriteFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/WriteFile.java
new file mode 100644
index 000000000..b0bd54997
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/WriteFile.java
@@ -0,0 +1,48 @@
+package com.seaweedfs.examples;
+
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.SeaweedInputStream;
+import seaweedfs.client.SeaweedOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public class WriteFile {
+
+ public static void main(String[] args) throws IOException {
+
+ FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
+
+ SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
+ filerGrpcClient, "/test.zip");
+ unZipFiles(filerGrpcClient, seaweedInputStream);
+
+ }
+
+ public static void unZipFiles(FilerGrpcClient filerGrpcClient, InputStream is) throws IOException {
+ ZipInputStream zin = new ZipInputStream(is);
+ ZipEntry ze;
+ while ((ze = zin.getNextEntry()) != null) {
+
+ String filename = ze.getName();
+ if (filename.indexOf("/") >= 0) {
+ filename = filename.substring(filename.lastIndexOf("/") + 1);
+ }
+ if (filename.length()==0) {
+ continue;
+ }
+
+ SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerGrpcClient, "/test/"+filename);
+ byte[] bytesIn = new byte[16 * 1024];
+ int read = 0;
+ while ((read = zin.read(bytesIn))!=-1) {
+ seaweedOutputStream.write(bytesIn,0,read);
+ }
+ seaweedOutputStream.close();
+
+ System.out.println(ze.getName());
+ }
+ }
+}