aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-11-13 12:10:55 -0800
committerChris Lu <chris.lu@gmail.com>2020-11-13 12:10:55 -0800
commit3e362451d226d9e19b4b652a02926dedc02f6cf9 (patch)
treebd06b7aa1418c8bd7b5b8a695772b7fbba645cfb
parent0d5355c6141f4d5ff83933f0b7803e88ae1df90b (diff)
downloadseaweedfs-3e362451d226d9e19b4b652a02926dedc02f6cf9.tar.xz
seaweedfs-3e362451d226d9e19b4b652a02926dedc02f6cf9.zip
add example of watch files
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java15
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java (renamed from other/java/examples/src/main/java/com/example/test/Example.java)12
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java42
3 files changed, 59 insertions, 10 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 035b2c852..7338d5bee 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -275,9 +275,9 @@ public class FilerClient {
try {
FilerProto.CreateEntryResponse createEntryResponse =
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
if (Strings.isNullOrEmpty(createEntryResponse.getError())) {
return true;
}
@@ -333,4 +333,13 @@ public class FilerClient {
return true;
}
+ public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) {
+ return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
+ .setPathPrefix(prefix)
+ .setClientName(clientName)
+ .setSinceNs(sinceNs)
+ .build()
+ );
+ }
+
}
diff --git a/other/java/examples/src/main/java/com/example/test/Example.java b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
index 3d22329a8..0529a5c73 100644
--- a/other/java/examples/src/main/java/com/example/test/Example.java
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
@@ -1,4 +1,4 @@
-package com.example.test;
+package com.seaweedfs.examples;
import seaweed.hdfs.SeaweedInputStream;
import seaweedfs.client.FilerClient;
@@ -10,22 +10,20 @@ import java.io.InputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
-public class Example {
-
- public static FilerClient filerClient = new FilerClient("localhost", 18888);
- public static FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
+public class UnzipFile {
public static void main(String[] args) throws IOException {
+ FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
+ FilerClient filerClient = new FilerClient(filerGrpcClient);
+
long startTime = System.currentTimeMillis();
- // 本地模式,速度很快
parseZip("/Users/chris/tmp/test.zip");
long startTime2 = System.currentTimeMillis();
long localProcessTime = startTime2 - startTime;
- // swfs读取,慢
SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
filerGrpcClient,
new org.apache.hadoop.fs.FileSystem.Statistics(""),
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java b/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java
new file mode 100644
index 000000000..c4f4c81b0
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java
@@ -0,0 +1,42 @@
+package com.seaweedfs.examples;
+
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class WatchFiles {
+
+ public static void main(String[] args) throws IOException {
+ FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
+ FilerClient filerClient = new FilerClient(filerGrpcClient);
+
+ Iterator<FilerProto.SubscribeMetadataResponse> watch = filerClient.watch(
+ "/buckets",
+ "exampleClient",
+ System.currentTimeMillis() * 1000000L
+ );
+
+ while (watch.hasNext()) {
+ FilerProto.SubscribeMetadataResponse event = watch.next();
+ FilerProto.EventNotification notification = event.getEventNotification();
+ if (notification.getNewParentPath() != null) {
+ // move an entry to a new directory, possibly with a new name
+ if (notification.hasOldEntry() && notification.hasNewEntry()) {
+ System.out.println("move " + event.getDirectory() + "/" + notification.getOldEntry().getName() + " to " + notification.getNewParentPath() + "/" + notification.getNewEntry().getName());
+ } else {
+ System.out.println("this should not happen.");
+ }
+ } else if (notification.hasNewEntry() && !notification.hasOldEntry()) {
+ System.out.println("create entry " + event.getDirectory() + "/" + notification.getNewEntry().getName());
+ } else if (!notification.hasNewEntry() && notification.hasOldEntry()) {
+ System.out.println("delete entry " + event.getDirectory() + "/" + notification.getOldEntry().getName());
+ } else if (notification.hasNewEntry() && notification.hasOldEntry()) {
+ System.out.println("updated entry " + event.getDirectory() + "/" + notification.getNewEntry().getName());
+ }
+ }
+
+ }
+}