aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
committershibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
commit40334bc28d3fa694ce59b4e65077efb845264d20 (patch)
treea085e2e33851c4d916bef2952abc7cfbfe95ee88 /other/java
parentd892cad15d748327c2b7c649f6398ff35d8dce0b (diff)
parentfbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff)
downloadseaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz
seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml15
-rw-r--r--other/java/client/pom_debug.xml139
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java27
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java9
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java33
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/Gzip.java37
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java55
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java150
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java43
-rw-r--r--other/java/client/src/main/proto/filer.proto69
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java42
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs2/pom.xml2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java12
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs3/pom.xml2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java12
17 files changed, 576 insertions, 75 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 0c585a941..a8b561251 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.4</version>
+ <version>1.2.8</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -88,8 +89,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -97,9 +98,11 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
- <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
<pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
</configuration>
<executions>
<execution>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
new file mode 100644
index 000000000..88447f7e7
--- /dev/null
+++ b/other/java/client/pom_debug.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.2.8</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
new file mode 100644
index 000000000..e249d4524
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -0,0 +1,27 @@
+package seaweedfs.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.concurrent.TimeUnit;
+
+public class ChunkCache {
+
+ private final Cache<String, byte[]> cache;
+
+ public ChunkCache(int maxEntries) {
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(maxEntries)
+ .expireAfterAccess(1, TimeUnit.HOURS)
+ .build();
+ }
+
+ public byte[] getChunk(String fileId) {
+ return this.cache.getIfPresent(fileId);
+ }
+
+ public void setChunk(String fileId, byte[] data) {
+ this.cache.put(fileId, data);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 84aa26ad9..ef32c7e9a 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -14,7 +14,7 @@ public class FilerClient {
private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
- private FilerGrpcClient filerGrpcClient;
+ private final FilerGrpcClient filerGrpcClient;
public FilerClient(String host, int grpcPort) {
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
@@ -181,7 +181,7 @@ public class FilerClient {
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
- while (iter.hasNext()){
+ while (iter.hasNext()) {
FilerProto.ListEntriesResponse resp = iter.next();
entries.add(fixEntryAfterReading(resp.getEntry()));
}
@@ -195,9 +195,12 @@ public class FilerClient {
.setDirectory(directory)
.setName(entryName)
.build()).getEntry();
+ if (entry == null) {
+ return null;
+ }
return fixEntryAfterReading(entry);
} catch (Exception e) {
- if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){
+ if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
return null;
}
LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 3626c76de..3f5d1e8e9 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -14,12 +14,6 @@ import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
-
- private final ManagedChannel channel;
- private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
- private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
- private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
-
static SslContext sslContext;
static {
@@ -30,6 +24,14 @@ public class FilerGrpcClient {
}
}
+ private final ManagedChannel channel;
+ private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
+ private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
+ private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
+ private boolean cipher = false;
+ private String collection = "";
+ private String replication = "";
+
public FilerGrpcClient(String host, int grpcPort) {
this(host, grpcPort, sslContext);
}
@@ -42,6 +44,13 @@ public class FilerGrpcClient {
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
+ FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
+ this.getBlockingStub().getFilerConfiguration(
+ FilerProto.GetFilerConfigurationRequest.newBuilder().build());
+ cipher = filerConfigurationResponse.getCipher();
+ collection = filerConfigurationResponse.getCollection();
+ replication = filerConfigurationResponse.getReplication();
+
}
public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
@@ -51,6 +60,18 @@ public class FilerGrpcClient {
futureStub = SeaweedFilerGrpc.newFutureStub(channel);
}
+ public boolean isCipher() {
+ return cipher;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public String getReplication() {
+ return replication;
+ }
+
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
new file mode 100644
index 000000000..248285dd3
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
@@ -0,0 +1,37 @@
+package seaweedfs.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class Gzip {
+ public static byte[] compress(byte[] data) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
+ GZIPOutputStream gzip = new GZIPOutputStream(bos);
+ gzip.write(data);
+ gzip.close();
+ byte[] compressed = bos.toByteArray();
+ bos.close();
+ return compressed;
+ }
+
+ public static byte[] decompress(byte[] compressed) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ return readAll(gis);
+ }
+
+ private static byte[] readAll(InputStream input) throws IOException {
+ try( ByteArrayOutputStream output = new ByteArrayOutputStream()){
+ byte[] buffer = new byte[4096];
+ int n;
+ while (-1 != (n = input.read(buffer))) {
+ output.write(buffer, 0, n);
+ }
+ return output.toByteArray();
+ }
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
new file mode 100644
index 000000000..8d0ebd755
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
@@ -0,0 +1,55 @@
+package seaweedfs.client;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.security.SecureRandom;
+
+public class SeaweedCipher {
+ // AES-GCM parameters
+ public static final int AES_KEY_SIZE = 256; // in bits
+ public static final int GCM_NONCE_LENGTH = 12; // in bytes
+ public static final int GCM_TAG_LENGTH = 16; // in bytes
+
+ private static SecureRandom random = new SecureRandom();
+
+ public static byte[] genCipherKey() throws Exception {
+ byte[] key = new byte[AES_KEY_SIZE / 8];
+ random.nextBytes(key);
+ return key;
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception {
+ return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey);
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception {
+
+ final byte[] nonce = new byte[GCM_NONCE_LENGTH];
+ random.nextBytes(nonce);
+ GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+
+ Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec);
+
+ byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length);
+
+ byte[] iv = AES_cipherInstance.getIV();
+ byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH];
+ System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length);
+
+ return message;
+ }
+
+ public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception {
+ final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+ AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params);
+ byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH);
+ return decryptedText;
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index b08c14467..7be39da53 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -6,16 +6,19 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.util.*;
public class SeaweedRead {
- // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+
+ static ChunkCache chunkCache = new ChunkCache(1000);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@@ -31,7 +34,7 @@ public class SeaweedRead {
}
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
- .getBlockingStub().lookupVolume(lookupRequest.build());
+ .getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
@@ -56,26 +59,38 @@ public class SeaweedRead {
}
private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
+
+ if (chunkData == null) {
+ chunkData = doFetchFullChunkData(chunkView, locations);
+ }
+
+ int len = (int) chunkView.size;
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
+ System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
+
+ chunkCache.setChunk(chunkView.fileId, chunkData);
+
+ return len;
+ }
+
+ private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
- String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+ String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
- if (!chunkView.isFullChunk) {
- request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
- request.setHeader(HttpHeaders.RANGE,
- String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1));
- }
+ request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
+
+ byte[] data = null;
try {
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
- int len = (int) (chunkView.logicOffset - position + chunkView.size);
- OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len));
- entity.writeTo(outputStream);
- // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len);
-
- return len;
+ data = EntityUtils.toByteArray(entity);
} finally {
if (client instanceof Closeable) {
@@ -83,6 +98,21 @@ public class SeaweedRead {
t.close();
}
}
+
+ if (chunkView.isGzipped) {
+ data = Gzip.decompress(data);
+ }
+
+ if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
+ try {
+ data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
+ } catch (Exception e) {
+ throw new IOException("fail to decrypt", e);
+ }
+ }
+
+ return data;
+
}
protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
@@ -93,11 +123,13 @@ public class SeaweedRead {
if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
- chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
- isFullChunk
+ chunk.fileId,
+ offset - chunk.start,
+ Math.min(chunk.stop, stop) - offset,
+ offset,
+ isFullChunk,
+ chunk.cipherKey,
+ chunk.isGzipped
));
offset = Math.min(chunk.stop, stop);
}
@@ -127,11 +159,13 @@ public class SeaweedRead {
List<VisibleInterval> newVisibles,
FilerProto.FileChunk chunk) {
VisibleInterval newV = new VisibleInterval(
- chunk.getOffset(),
- chunk.getOffset() + chunk.getSize(),
- chunk.getFileId(),
- chunk.getMtime(),
- true
+ chunk.getOffset(),
+ chunk.getOffset() + chunk.getSize(),
+ chunk.getFileId(),
+ chunk.getMtime(),
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsGzipped()
);
// easy cases to speed up
@@ -147,21 +181,25 @@ public class SeaweedRead {
for (VisibleInterval v : visibles) {
if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
newVisibles.add(new VisibleInterval(
- v.start,
- chunk.getOffset(),
- v.fileId,
- v.modifiedTime,
- false
+ v.start,
+ chunk.getOffset(),
+ v.fileId,
+ v.modifiedTime,
+ false,
+ v.cipherKey,
+ v.isGzipped
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
if (v.start < chunkStop && chunkStop < v.stop) {
newVisibles.add(new VisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false
+ chunkStop,
+ v.stop,
+ v.fileId,
+ v.modifiedTime,
+ false,
+ v.cipherKey,
+ v.isGzipped
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -208,24 +246,30 @@ public class SeaweedRead {
public final long modifiedTime;
public final String fileId;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isGzipped;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isGzipped = isGzipped;
}
@Override
public String toString() {
return "VisibleInterval{" +
- "start=" + start +
- ", stop=" + stop +
- ", modifiedTime=" + modifiedTime +
- ", fileId='" + fileId + '\'' +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "start=" + start +
+ ", stop=" + stop +
+ ", modifiedTime=" + modifiedTime +
+ ", fileId='" + fileId + '\'' +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isGzipped=" + isGzipped +
+ '}';
}
}
@@ -235,24 +279,30 @@ public class SeaweedRead {
public final long size;
public final long logicOffset;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isGzipped;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isGzipped = isGzipped;
}
@Override
public String toString() {
return "ChunkView{" +
- "fileId='" + fileId + '\'' +
- ", offset=" + offset +
- ", size=" + size +
- ", logicOffset=" + logicOffset +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "fileId='" + fileId + '\'' +
+ ", offset=" + offset +
+ ", size=" + size +
+ ", logicOffset=" + logicOffset +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isGzipped=" + isGzipped +
+ '}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 0663e8d98..dc6203e52 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,5 +1,6 @@
package seaweedfs.client;
+import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
@@ -11,9 +12,12 @@ import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.security.SecureRandom;
public class SeaweedWrite {
+ private static SecureRandom random = new SecureRandom();
+
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
final FilerGrpcClient filerGrpcClient,
@@ -22,10 +26,9 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection("")
- .setReplication(replication)
+ .setCollection(filerGrpcClient.getCollection())
+ .setReplication(replication == null ? filerGrpcClient.getReplication() : replication)
.setDataCenter("")
- .setReplication("")
.setTtlSec(0)
.build());
String fileId = response.getFileId();
@@ -33,7 +36,17 @@ public class SeaweedWrite {
String auth = response.getAuth();
String targetUrl = String.format("http://%s/%s", url, fileId);
- String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength);
+ ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
+ byte[] cipherKey = null;
+ if (filerGrpcClient.isCipher()) {
+ cipherKey = genCipherKey();
+ cipherKeyString = ByteString.copyFrom(cipherKey);
+ }
+
+ String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
+
+ // cache fileId ~ bytes
+ SeaweedRead.chunkCache.setChunk(fileId, bytes);
entry.addChunks(FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
@@ -41,6 +54,7 @@ public class SeaweedWrite {
.setSize(bytesLength)
.setMtime(System.currentTimeMillis() / 10000L)
.setETag(etag)
+ .setCipherKey(cipherKeyString)
);
}
@@ -58,11 +72,22 @@ public class SeaweedWrite {
private static String multipartUpload(String targetUrl,
String auth,
final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
+ final long bytesOffset, final long bytesLength,
+ byte[] cipherKey) throws IOException {
HttpClient client = new DefaultHttpClient();
- InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ InputStream inputStream = null;
+ if (cipherKey == null || cipherKey.length == 0) {
+ inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ } else {
+ try {
+ byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey);
+ inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length);
+ } catch (Exception e) {
+ throw new IOException("fail to encrypt data", e);
+ }
+ }
HttpPost post = new HttpPost(targetUrl);
if (auth != null && auth.length() != 0) {
@@ -92,4 +117,10 @@ public class SeaweedWrite {
}
}
+
+ private static byte[] genCipherKey() {
+ byte[] b = new byte[32];
+ random.nextBytes(b);
+ return b;
+ }
}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 6357d971f..1fc8ef63d 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -21,6 +21,9 @@ service SeaweedFiler {
rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) {
}
+ rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) {
+ }
+
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
}
@@ -42,6 +45,15 @@ service SeaweedFiler {
rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
}
+ rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
+ rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
+ }
+
+ rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -96,6 +108,8 @@ message FileChunk {
string source_file_id = 6; // to be deprecated
FileId fid = 7;
FileId source_fid = 8;
+ bytes cipher_key = 9;
+ bool is_gzipped = 10;
}
message FileId {
@@ -118,6 +132,7 @@ message FuseAttributes {
string user_name = 11; // for hdfs
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
+ bytes md5 = 14;
}
message CreateEntryRequest {
@@ -137,6 +152,14 @@ message UpdateEntryRequest {
message UpdateEntryResponse {
}
+message AppendToEntryRequest {
+ string directory = 1;
+ string entry_name = 2;
+ repeated FileChunk chunks = 3;
+}
+message AppendToEntryResponse {
+}
+
message DeleteEntryRequest {
string directory = 1;
string name = 2;
@@ -147,6 +170,7 @@ message DeleteEntryRequest {
}
message DeleteEntryResponse {
+ string error = 1;
}
message AtomicRenameEntryRequest {
@@ -165,6 +189,7 @@ message AssignVolumeRequest {
string replication = 3;
int32 ttl_sec = 4;
string data_center = 5;
+ string parent_path = 6;
}
message AssignVolumeResponse {
@@ -173,6 +198,9 @@ message AssignVolumeResponse {
string public_url = 3;
int32 count = 4;
string auth = 5;
+ string collection = 6;
+ string replication = 7;
+ string error = 8;
}
message LookupVolumeRequest {
@@ -219,4 +247,45 @@ message GetFilerConfigurationResponse {
string replication = 2;
string collection = 3;
uint32 max_mb = 4;
+ string dir_buckets = 5;
+ bool cipher = 7;
+}
+
+message SubscribeMetadataRequest {
+ string client_name = 1;
+ string path_prefix = 2;
+ int64 since_ns = 3;
+}
+message SubscribeMetadataResponse {
+ string directory = 1;
+ EventNotification event_notification = 2;
+ int64 ts_ns = 3;
+}
+
+message LogEntry {
+ int64 ts_ns = 1;
+ int32 partition_key_hash = 2;
+ bytes data = 3;
+}
+
+message KeepConnectedRequest {
+ string name = 1;
+ uint32 grpc_port = 2;
+ repeated string resources = 3;
+}
+message KeepConnectedResponse {
+}
+
+message LocateBrokerRequest {
+ string resource = 1;
+}
+message LocateBrokerResponse {
+ bool found = 1;
+ // if found, send the exact address
+ // if not found, send the full list of existing brokers
+ message Resource {
+ string grpc_addresses = 1;
+ int32 resource_count = 2;
+ }
+ repeated Resource resources = 2;
}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
new file mode 100644
index 000000000..7b5e53e19
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
@@ -0,0 +1,42 @@
+package seaweedfs.client;
+
+import org.junit.Test;
+
+import java.util.Base64;
+
+import static seaweedfs.client.SeaweedCipher.decrypt;
+import static seaweedfs.client.SeaweedCipher.encrypt;
+
+public class SeaweedCipherTest {
+
+ @Test
+ public void testSameAsGoImplemnetation() throws Exception {
+ byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+ @Test
+ public void testEncryptDecrypt() throws Exception {
+ byte[] secretKey = SeaweedCipher.genCipherKey();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+}
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index d818bc878..bef448f3f 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index b8c8cb891..f3086fab8 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 774c090e8..9617a38be 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -64,6 +64,16 @@ public class SeaweedFileSystemStore {
public FileStatus[] listEntries(final Path path) {
LOG.debug("listEntries path: {}", path);
+ FileStatus pathStatus = getFileStatus(path);
+
+ if (pathStatus == null) {
+ return new FileStatus[0];
+ }
+
+ if (!pathStatus.isDirectory()) {
+ return new FileStatus[]{pathStatus};
+ }
+
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
@@ -74,7 +84,9 @@ public class SeaweedFileSystemStore {
fileStatuses.add(fileStatus);
}
+ LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size());
return fileStatuses.toArray(new FileStatus[0]);
+
}
public FileStatus getFileStatus(final Path path) {
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index ca53ffd22..f2056b7b1 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index f5207213c..6ca210f78 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 774c090e8..9617a38be 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -64,6 +64,16 @@ public class SeaweedFileSystemStore {
public FileStatus[] listEntries(final Path path) {
LOG.debug("listEntries path: {}", path);
+ FileStatus pathStatus = getFileStatus(path);
+
+ if (pathStatus == null) {
+ return new FileStatus[0];
+ }
+
+ if (!pathStatus.isDirectory()) {
+ return new FileStatus[]{pathStatus};
+ }
+
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
@@ -74,7 +84,9 @@ public class SeaweedFileSystemStore {
fileStatuses.add(fileStatus);
}
+ LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size());
return fileStatuses.toArray(new FileStatus[0]);
+
}
public FileStatus getFileStatus(final Path path) {