diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /other | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'other')
17 files changed, 576 insertions, 75 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 0c585a941..a8b561251 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -1,10 +1,11 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.2.4</version> + <version>1.2.8</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -88,8 +89,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>7</source> - <target>7</target> + <source>8</source> + <target>8</target> </configuration> </plugin> <plugin> @@ -97,9 +98,11 @@ <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> - <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> <pluginId>grpc-java</pluginId> - <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> </configuration> <executions> <execution> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml new file mode 100644 index 000000000..88447f7e7 --- /dev/null +++ b/other/java/client/pom_debug.xml @@ -0,0 +1,139 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.github.chrislusf</groupId> + <artifactId>seaweedfs-client</artifactId> + <version>1.2.8</version> + + <parent> + <groupId>org.sonatype.oss</groupId> + <artifactId>oss-parent</artifactId> + <version>9</version> + </parent> + + <properties> + <protobuf.version>3.9.1</protobuf.version> + <!-- follow https://github.com/grpc/grpc-java --> + <grpc.version>1.23.0</grpc.version> + <guava.version>28.0-jre</guava.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.moandjiezana.toml</groupId> + <artifactId>toml4j</artifactId> + <version>0.7.2</version> + </dependency> + <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>4.5.6</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.6.2</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> + <target>8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.2.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9.1</version> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java new file mode 100644 index 000000000..e249d4524 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -0,0 +1,27 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class ChunkCache { + + private final Cache<String, byte[]> cache; + + public ChunkCache(int maxEntries) { + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(1, TimeUnit.HOURS) + .build(); + } + + public byte[] getChunk(String fileId) { + return this.cache.getIfPresent(fileId); + } + + public void setChunk(String fileId, byte[] data) { + this.cache.put(fileId, data); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 84aa26ad9..ef32c7e9a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -14,7 +14,7 @@ public class FilerClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private FilerGrpcClient filerGrpcClient; + private final FilerGrpcClient filerGrpcClient; public FilerClient(String host, int grpcPort) { filerGrpcClient = new FilerGrpcClient(host, grpcPort); @@ -181,7 +181,7 @@ public class FilerClient { .setLimit(limit) .build()); List<FilerProto.Entry> entries = new ArrayList<>(); - while (iter.hasNext()){ + while (iter.hasNext()) { FilerProto.ListEntriesResponse resp = iter.next(); entries.add(fixEntryAfterReading(resp.getEntry())); } @@ -195,9 +195,12 @@ public class FilerClient { .setDirectory(directory) .setName(entryName) .build()).getEntry(); + if (entry == null) { + return null; + } return fixEntryAfterReading(entry); } catch (Exception e) { - if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){ + if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { return null; } LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 3626c76de..3f5d1e8e9 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -14,12 +14,6 @@ import java.util.concurrent.TimeUnit; public class FilerGrpcClient { private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); - - private final ManagedChannel channel; - private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; - private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; - private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; - static SslContext sslContext; static { @@ -30,6 +24,14 @@ public class FilerGrpcClient { } } + private final ManagedChannel channel; + private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; + private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; + private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + private boolean cipher = false; + private String collection = ""; + private String replication = ""; + public FilerGrpcClient(String host, int grpcPort) { this(host, grpcPort, sslContext); } @@ -42,6 +44,13 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = + this.getBlockingStub().getFilerConfiguration( + FilerProto.GetFilerConfigurationRequest.newBuilder().build()); + cipher = filerConfigurationResponse.getCipher(); + collection = filerConfigurationResponse.getCollection(); + replication = filerConfigurationResponse.getReplication(); + } public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { @@ -51,6 +60,18 @@ public class FilerGrpcClient { futureStub = SeaweedFilerGrpc.newFutureStub(channel); } + public boolean isCipher() { + return cipher; + } + + public String getCollection() { + return collection; + } + + public String getReplication() { + return replication; + } + public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java new file mode 100644 index 000000000..248285dd3 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -0,0 +1,37 @@ +package seaweedfs.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class Gzip { + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length); + GZIPOutputStream gzip = new GZIPOutputStream(bos); + gzip.write(data); + gzip.close(); + byte[] compressed = bos.toByteArray(); + bos.close(); + return compressed; + } + + public static byte[] decompress(byte[] compressed) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } + + private static byte[] readAll(InputStream input) throws IOException { + try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + byte[] buffer = new byte[4096]; + int n; + while (-1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + } + return output.toByteArray(); + } + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java new file mode 100644 index 000000000..8d0ebd755 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java @@ -0,0 +1,55 @@ +package seaweedfs.client; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.security.SecureRandom; + +public class SeaweedCipher { + // AES-GCM parameters + public static final int AES_KEY_SIZE = 256; // in bits + public static final int GCM_NONCE_LENGTH = 12; // in bytes + public static final int GCM_TAG_LENGTH = 16; // in bytes + + private static SecureRandom random = new SecureRandom(); + + public static byte[] genCipherKey() throws Exception { + byte[] key = new byte[AES_KEY_SIZE / 8]; + random.nextBytes(key); + return key; + } + + public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception { + return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey); + } + + public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception { + + final byte[] nonce = new byte[GCM_NONCE_LENGTH]; + random.nextBytes(nonce); + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + + Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec); + + byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length); + + byte[] iv = AES_cipherInstance.getIV(); + byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH]; + System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length); + + return message; + } + + public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception { + final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params); + byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH); + return decryptedText; + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index b08c14467..7be39da53 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -6,16 +6,19 @@ import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.*; public class SeaweedRead { - // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + + static ChunkCache chunkCache = new ChunkCache(1000); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, @@ -31,7 +34,7 @@ public class SeaweedRead { } FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); + .getBlockingStub().lookupVolume(lookupRequest.build()); Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); @@ -56,26 +59,38 @@ public class SeaweedRead { } private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + + byte[] chunkData = chunkCache.getChunk(chunkView.fileId); + + if (chunkData == null) { + chunkData = doFetchFullChunkData(chunkView, locations); + } + + int len = (int) chunkView.size; + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", + chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); + System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); + + chunkCache.setChunk(chunkView.fileId, chunkData); + + return len; + } + + private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); - if (!chunkView.isFullChunk) { - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); - } + request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + + byte[] data = null; try { HttpResponse response = client.execute(request); HttpEntity entity = response.getEntity(); - int len = (int) (chunkView.logicOffset - position + chunkView.size); - OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); - entity.writeTo(outputStream); - // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); - - return len; + data = EntityUtils.toByteArray(entity); } finally { if (client instanceof Closeable) { @@ -83,6 +98,21 @@ public class SeaweedRead { t.close(); } } + + if (chunkView.isGzipped) { + data = Gzip.decompress(data); + } + + if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); + } + } + + return data; + } protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) { @@ -93,11 +123,13 @@ public class SeaweedRead { if (chunk.start <= offset && offset < chunk.stop && offset < stop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( - chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, - isFullChunk + chunk.fileId, + offset - chunk.start, + Math.min(chunk.stop, stop) - offset, + offset, + isFullChunk, + chunk.cipherKey, + chunk.isGzipped )); offset = Math.min(chunk.stop, stop); } @@ -127,11 +159,13 @@ public class SeaweedRead { List<VisibleInterval> newVisibles, FilerProto.FileChunk chunk) { VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - true + chunk.getOffset(), + chunk.getOffset() + chunk.getSize(), + chunk.getFileId(), + chunk.getMtime(), + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsGzipped() ); // easy cases to speed up @@ -147,21 +181,25 @@ public class SeaweedRead { for (VisibleInterval v : visibles) { if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - false + v.start, + chunk.getOffset(), + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } long chunkStop = chunk.getOffset() + chunk.getSize(); if (v.start < chunkStop && chunkStop < v.stop) { newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false + chunkStop, + v.stop, + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -208,24 +246,30 @@ public class SeaweedRead { public final long modifiedTime; public final String fileId; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "VisibleInterval{" + - "start=" + start + - ", stop=" + stop + - ", modifiedTime=" + modifiedTime + - ", fileId='" + fileId + '\'' + - ", isFullChunk=" + isFullChunk + - '}'; + "start=" + start + + ", stop=" + stop + + ", modifiedTime=" + modifiedTime + + ", fileId='" + fileId + '\'' + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } @@ -235,24 +279,30 @@ public class SeaweedRead { public final long size; public final long logicOffset; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "ChunkView{" + - "fileId='" + fileId + '\'' + - ", offset=" + offset + - ", size=" + size + - ", logicOffset=" + logicOffset + - ", isFullChunk=" + isFullChunk + - '}'; + "fileId='" + fileId + '\'' + + ", offset=" + offset + + ", size=" + size + + ", logicOffset=" + logicOffset + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 0663e8d98..dc6203e52 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,5 +1,6 @@ package seaweedfs.client; +import com.google.protobuf.ByteString; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; @@ -11,9 +12,12 @@ import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.security.SecureRandom; public class SeaweedWrite { + private static SecureRandom random = new SecureRandom(); + public static void writeData(FilerProto.Entry.Builder entry, final String replication, final FilerGrpcClient filerGrpcClient, @@ -22,10 +26,9 @@ public class SeaweedWrite { final long bytesOffset, final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) + .setCollection(filerGrpcClient.getCollection()) + .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) .setDataCenter("") - .setReplication("") .setTtlSec(0) .build()); String fileId = response.getFileId(); @@ -33,7 +36,17 @@ public class SeaweedWrite { String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength); + ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; + byte[] cipherKey = null; + if (filerGrpcClient.isCipher()) { + cipherKey = genCipherKey(); + cipherKeyString = ByteString.copyFrom(cipherKey); + } + + String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); + + // cache fileId ~ bytes + SeaweedRead.chunkCache.setChunk(fileId, bytes); entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId) @@ -41,6 +54,7 @@ public class SeaweedWrite { .setSize(bytesLength) .setMtime(System.currentTimeMillis() / 10000L) .setETag(etag) + .setCipherKey(cipherKeyString) ); } @@ -58,11 +72,22 @@ public class SeaweedWrite { private static String multipartUpload(String targetUrl, String auth, final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { HttpClient client = new DefaultHttpClient(); - InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + InputStream inputStream = null; + if (cipherKey == null || cipherKey.length == 0) { + inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + } else { + try { + byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey); + inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length); + } catch (Exception e) { + throw new IOException("fail to encrypt data", e); + } + } HttpPost post = new HttpPost(targetUrl); if (auth != null && auth.length() != 0) { @@ -92,4 +117,10 @@ public class SeaweedWrite { } } + + private static byte[] genCipherKey() { + byte[] b = new byte[32]; + random.nextBytes(b); + return b; + } } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6357d971f..1fc8ef63d 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -21,6 +21,9 @@ service SeaweedFiler { rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) { } + rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) { + } + rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } @@ -42,6 +45,15 @@ service SeaweedFiler { rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } + rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { + } + + rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { + } + + rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { + } + } ////////////////////////////////////////////////// @@ -96,6 +108,8 @@ message FileChunk { string source_file_id = 6; // to be deprecated FileId fid = 7; FileId source_fid = 8; + bytes cipher_key = 9; + bool is_gzipped = 10; } message FileId { @@ -118,6 +132,7 @@ message FuseAttributes { string user_name = 11; // for hdfs repeated string group_name = 12; // for hdfs string symlink_target = 13; + bytes md5 = 14; } message CreateEntryRequest { @@ -137,6 +152,14 @@ message UpdateEntryRequest { message UpdateEntryResponse { } +message AppendToEntryRequest { + string directory = 1; + string entry_name = 2; + repeated FileChunk chunks = 3; +} +message AppendToEntryResponse { +} + message DeleteEntryRequest { string directory = 1; string name = 2; @@ -147,6 +170,7 @@ message DeleteEntryRequest { } message DeleteEntryResponse { + string error = 1; } message AtomicRenameEntryRequest { @@ -165,6 +189,7 @@ message AssignVolumeRequest { string replication = 3; int32 ttl_sec = 4; string data_center = 5; + string parent_path = 6; } message AssignVolumeResponse { @@ -173,6 +198,9 @@ message AssignVolumeResponse { string public_url = 3; int32 count = 4; string auth = 5; + string collection = 6; + string replication = 7; + string error = 8; } message LookupVolumeRequest { @@ -219,4 +247,45 @@ message GetFilerConfigurationResponse { string replication = 2; string collection = 3; uint32 max_mb = 4; + string dir_buckets = 5; + bool cipher = 7; +} + +message SubscribeMetadataRequest { + string client_name = 1; + string path_prefix = 2; + int64 since_ns = 3; +} +message SubscribeMetadataResponse { + string directory = 1; + EventNotification event_notification = 2; + int64 ts_ns = 3; +} + +message LogEntry { + int64 ts_ns = 1; + int32 partition_key_hash = 2; + bytes data = 3; +} + +message KeepConnectedRequest { + string name = 1; + uint32 grpc_port = 2; + repeated string resources = 3; +} +message KeepConnectedResponse { +} + +message LocateBrokerRequest { + string resource = 1; +} +message LocateBrokerResponse { + bool found = 1; + // if found, send the exact address + // if not found, send the full list of existing brokers + message Resource { + string grpc_addresses = 1; + int32 resource_count = 2; + } + repeated Resource resources = 2; } diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java new file mode 100644 index 000000000..7b5e53e19 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java @@ -0,0 +1,42 @@ +package seaweedfs.client; + +import org.junit.Test; + +import java.util.Base64; + +import static seaweedfs.client.SeaweedCipher.decrypt; +import static seaweedfs.client.SeaweedCipher.encrypt; + +public class SeaweedCipherTest { + + @Test + public void testSameAsGoImplemnetation() throws Exception { + byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + + @Test + public void testEncryptDecrypt() throws Exception { + byte[] secretKey = SeaweedCipher.genCipherKey(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + +} diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index d818bc878..bef448f3f 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index b8c8cb891..f3086fab8 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.2.4</seaweedfs.client.version> + <seaweedfs.client.version>1.2.8</seaweedfs.client.version> <hadoop.version>2.9.2</hadoop.version> </properties> diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 774c090e8..9617a38be 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -64,6 +64,16 @@ public class SeaweedFileSystemStore { public FileStatus[] listEntries(final Path path) { LOG.debug("listEntries path: {}", path); + FileStatus pathStatus = getFileStatus(path); + + if (pathStatus == null) { + return new FileStatus[0]; + } + + if (!pathStatus.isDirectory()) { + return new FileStatus[]{pathStatus}; + } + List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath()); @@ -74,7 +84,9 @@ public class SeaweedFileSystemStore { fileStatuses.add(fileStatus); } + LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size()); return fileStatuses.toArray(new FileStatus[0]); + } public FileStatus getFileStatus(final Path path) { diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index ca53ffd22..f2056b7b1 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index f5207213c..6ca210f78 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.2.4</seaweedfs.client.version> + <seaweedfs.client.version>1.2.8</seaweedfs.client.version> <hadoop.version>3.1.1</hadoop.version> </properties> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 774c090e8..9617a38be 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -64,6 +64,16 @@ public class SeaweedFileSystemStore { public FileStatus[] listEntries(final Path path) { LOG.debug("listEntries path: {}", path); + FileStatus pathStatus = getFileStatus(path); + + if (pathStatus == null) { + return new FileStatus[0]; + } + + if (!pathStatus.isDirectory()) { + return new FileStatus[]{pathStatus}; + } + List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath()); @@ -74,7 +84,9 @@ public class SeaweedFileSystemStore { fileStatuses.add(fileStatus); } + LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size()); return fileStatuses.toArray(new FileStatus[0]); + } public FileStatus getFileStatus(final Path path) { |
