From d8dec2323bde1a5ab787b719a240969852004456 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 24 Feb 2020 14:34:14 -0800 Subject: s3: move buckets folder configuration to filer --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6357d971f..909458daf 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -219,4 +219,5 @@ message GetFilerConfigurationResponse { string replication = 2; string collection = 3; uint32 max_mb = 4; + string dir_buckets = 5; } -- cgit v1.2.3 From 6ab7368ef2556ef086d13c6d0d4454f1e98a5cd8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 24 Feb 2020 22:28:45 -0800 Subject: filer: dynamically create bucket under /buckets folder --- other/java/client/src/main/proto/filer.proto | 3 +++ 1 file changed, 3 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 909458daf..6892effe8 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -165,6 +165,7 @@ message AssignVolumeRequest { string replication = 3; int32 ttl_sec = 4; string data_center = 5; + string parent_path = 6; } message AssignVolumeResponse { @@ -173,6 +174,8 @@ message AssignVolumeResponse { string public_url = 3; int32 count = 4; string auth = 5; + string collection = 6; + string replication = 7; } message LookupVolumeRequest { -- cgit v1.2.3 From bc38b72a20bd79bf67ee1770e20dcd538285cedf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 14:38:36 -0800 Subject: s3: implemented DeleteMultipleObjects --- other/java/client/src/main/proto/filer.proto | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6892effe8..d26c5595f 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -24,6 +24,9 @@ service SeaweedFiler { rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } + rpc StreamDeleteEntries (stream DeleteEntryRequest) returns (stream DeleteEntryResponse) { + } + rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { } @@ -147,6 +150,7 @@ message DeleteEntryRequest { } message DeleteEntryResponse { + string error = 1; } message AtomicRenameEntryRequest { -- cgit v1.2.3 From 0841bedb150fb7d4a96c237961474310942c2454 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 17:15:09 -0800 Subject: move filer assign volume grpc errror to response --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index d26c5595f..04901770a 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -180,6 +180,7 @@ message AssignVolumeResponse { string auth = 5; string collection = 6; string replication = 7; + string error = 8; } message LookupVolumeRequest { -- cgit v1.2.3 From 555413d9fc4837302ef1c5b2b921b406c9de6777 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 27 Feb 2020 00:07:13 -0800 Subject: weed queue starts --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 04901770a..9ee552561 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -228,4 +228,5 @@ message GetFilerConfigurationResponse { string collection = 3; uint32 max_mb = 4; string dir_buckets = 5; + string dir_queues = 6; } -- cgit v1.2.3 From 13e215ee5cb5f4c2873f89c263d8c970e9978b19 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2020 00:49:47 -0800 Subject: filer: option to encrypt data on volume server --- other/java/client/src/main/proto/filer.proto | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 9ee552561..5983c84d8 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -99,6 +99,7 @@ message FileChunk { string source_file_id = 6; // to be deprecated FileId fid = 7; FileId source_fid = 8; + bytes cipher_key = 9; } message FileId { @@ -229,4 +230,5 @@ message GetFilerConfigurationResponse { uint32 max_mb = 4; string dir_buckets = 5; string dir_queues = 6; + bool cipher = 7; } -- cgit v1.2.3 From 2e3f6ad3a97bc7fad349e63289695547f92c1f8b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2020 21:39:33 -0700 Subject: filer: remember content is gzipped or not --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 5983c84d8..8df46e917 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -100,6 +100,7 @@ message FileChunk { FileId fid = 7; FileId source_fid = 8; bytes cipher_key = 9; + bool is_gzipped = 10; } message FileId { -- cgit v1.2.3 From de1ba85346bd2af41ebb98f152bb769e2b630139 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 14 Mar 2020 00:27:57 -0700 Subject: HDFS support encrypted data storage --- other/java/client/pom.xml | 15 ++- .../java/seaweedfs/client/FilerGrpcClient.java | 33 ++++- .../src/main/java/seaweedfs/client/Gzip.java | 37 ++++++ .../main/java/seaweedfs/client/SeaweedCipher.java | 55 +++++++++ .../main/java/seaweedfs/client/SeaweedRead.java | 137 +++++++++++++++------ .../main/java/seaweedfs/client/SeaweedWrite.java | 40 +++++- .../java/seaweedfs/client/SeaweedCipherTest.java | 42 +++++++ 7 files changed, 304 insertions(+), 55 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/Gzip.java create mode 100644 other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java create mode 100644 other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java (limited to 'other/java/client') diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 0c585a941..945071336 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -1,10 +1,11 @@ - + 4.0.0 com.github.chrislusf seaweedfs-client - 1.2.4 + 1.2.5 org.sonatype.oss @@ -88,8 +89,8 @@ org.apache.maven.plugins maven-compiler-plugin - 7 - 7 + 8 + 8 @@ -97,9 +98,11 @@ protobuf-maven-plugin 0.6.1 - com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + grpc-java - io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 3626c76de..3f5d1e8e9 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -14,12 +14,6 @@ import java.util.concurrent.TimeUnit; public class FilerGrpcClient { private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); - - private final ManagedChannel channel; - private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; - private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; - private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; - static SslContext sslContext; static { @@ -30,6 +24,14 @@ public class FilerGrpcClient { } } + private final ManagedChannel channel; + private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; + private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; + private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + private boolean cipher = false; + private String collection = ""; + private String replication = ""; + public FilerGrpcClient(String host, int grpcPort) { this(host, grpcPort, sslContext); } @@ -42,6 +44,13 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = + this.getBlockingStub().getFilerConfiguration( + FilerProto.GetFilerConfigurationRequest.newBuilder().build()); + cipher = filerConfigurationResponse.getCipher(); + collection = filerConfigurationResponse.getCollection(); + replication = filerConfigurationResponse.getReplication(); + } public FilerGrpcClient(ManagedChannelBuilder channelBuilder) { @@ -51,6 +60,18 @@ public class FilerGrpcClient { futureStub = SeaweedFilerGrpc.newFutureStub(channel); } + public boolean isCipher() { + return cipher; + } + + public String getCollection() { + return collection; + } + + public String getReplication() { + return replication; + } + public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java new file mode 100644 index 000000000..248285dd3 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -0,0 +1,37 @@ +package seaweedfs.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class Gzip { + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length); + GZIPOutputStream gzip = new GZIPOutputStream(bos); + gzip.write(data); + gzip.close(); + byte[] compressed = bos.toByteArray(); + bos.close(); + return compressed; + } + + public static byte[] decompress(byte[] compressed) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } + + private static byte[] readAll(InputStream input) throws IOException { + try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + byte[] buffer = new byte[4096]; + int n; + while (-1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + } + return output.toByteArray(); + } + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java new file mode 100644 index 000000000..8d0ebd755 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java @@ -0,0 +1,55 @@ +package seaweedfs.client; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.security.SecureRandom; + +public class SeaweedCipher { + // AES-GCM parameters + public static final int AES_KEY_SIZE = 256; // in bits + public static final int GCM_NONCE_LENGTH = 12; // in bytes + public static final int GCM_TAG_LENGTH = 16; // in bytes + + private static SecureRandom random = new SecureRandom(); + + public static byte[] genCipherKey() throws Exception { + byte[] key = new byte[AES_KEY_SIZE / 8]; + random.nextBytes(key); + return key; + } + + public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception { + return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey); + } + + public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception { + + final byte[] nonce = new byte[GCM_NONCE_LENGTH]; + random.nextBytes(nonce); + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + + Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec); + + byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length); + + byte[] iv = AES_cipherInstance.getIV(); + byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH]; + System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length); + + return message; + } + + public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception { + final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params); + byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH); + return decryptedText; + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index b08c14467..d2717056f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -6,6 +6,7 @@ import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; import java.io.Closeable; import java.io.IOException; @@ -31,7 +32,7 @@ public class SeaweedRead { } FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); + .getBlockingStub().lookupVolume(lookupRequest.build()); Map vid2Locations = lookupResponse.getLocationsMapMap(); @@ -56,14 +57,18 @@ public class SeaweedRead { } private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + if (chunkView.cipherKey != null) { + return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations); + } + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); if (!chunkView.isFullChunk) { request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); + String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); } try { @@ -85,6 +90,44 @@ public class SeaweedRead { } } + private static int readEncryptedChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + HttpClient client = new DefaultHttpClient(); + HttpGet request = new HttpGet( + String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + + request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + + byte[] data = null; + + try { + HttpResponse response = client.execute(request); + HttpEntity entity = response.getEntity(); + + data = EntityUtils.toByteArray(entity); + + } finally { + if (client instanceof Closeable) { + Closeable t = (Closeable) client; + t.close(); + } + } + + if (chunkView.isGzipped) { + data = Gzip.decompress(data); + } + + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); + } + + int len = (int) (chunkView.logicOffset - position + chunkView.size); + System.arraycopy(data, (int) chunkView.offset, buffer, startOffset, len); + return len; + + } + protected static List viewFromVisibles(List visibleIntervals, long offset, long size) { List views = new ArrayList<>(); @@ -93,11 +136,13 @@ public class SeaweedRead { if (chunk.start <= offset && offset < chunk.stop && offset < stop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( - chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, - isFullChunk + chunk.fileId, + offset - chunk.start, + Math.min(chunk.stop, stop) - offset, + offset, + isFullChunk, + chunk.cipherKey, + chunk.isGzipped )); offset = Math.min(chunk.stop, stop); } @@ -127,11 +172,13 @@ public class SeaweedRead { List newVisibles, FilerProto.FileChunk chunk) { VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - true + chunk.getOffset(), + chunk.getOffset() + chunk.getSize(), + chunk.getFileId(), + chunk.getMtime(), + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsGzipped() ); // easy cases to speed up @@ -147,21 +194,25 @@ public class SeaweedRead { for (VisibleInterval v : visibles) { if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - false + v.start, + chunk.getOffset(), + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } long chunkStop = chunk.getOffset() + chunk.getSize(); if (v.start < chunkStop && chunkStop < v.stop) { newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false + chunkStop, + v.stop, + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -208,24 +259,30 @@ public class SeaweedRead { public final long modifiedTime; public final String fileId; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "VisibleInterval{" + - "start=" + start + - ", stop=" + stop + - ", modifiedTime=" + modifiedTime + - ", fileId='" + fileId + '\'' + - ", isFullChunk=" + isFullChunk + - '}'; + "start=" + start + + ", stop=" + stop + + ", modifiedTime=" + modifiedTime + + ", fileId='" + fileId + '\'' + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } @@ -235,24 +292,30 @@ public class SeaweedRead { public final long size; public final long logicOffset; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "ChunkView{" + - "fileId='" + fileId + '\'' + - ", offset=" + offset + - ", size=" + size + - ", logicOffset=" + logicOffset + - ", isFullChunk=" + isFullChunk + - '}'; + "fileId='" + fileId + '\'' + + ", offset=" + offset + + ", size=" + size + + ", logicOffset=" + logicOffset + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 0663e8d98..06c1bdd9f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,5 +1,6 @@ package seaweedfs.client; +import com.google.protobuf.ByteString; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; @@ -11,9 +12,12 @@ import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.security.SecureRandom; public class SeaweedWrite { + private static SecureRandom random = new SecureRandom(); + public static void writeData(FilerProto.Entry.Builder entry, final String replication, final FilerGrpcClient filerGrpcClient, @@ -22,10 +26,9 @@ public class SeaweedWrite { final long bytesOffset, final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) + .setCollection(filerGrpcClient.getCollection()) + .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) .setDataCenter("") - .setReplication("") .setTtlSec(0) .build()); String fileId = response.getFileId(); @@ -33,7 +36,14 @@ public class SeaweedWrite { String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength); + ByteString cipherKeyString = null; + byte[] cipherKey = null; + if (filerGrpcClient.isCipher()) { + cipherKey = genCipherKey(); + cipherKeyString = ByteString.copyFrom(cipherKey); + } + + String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId) @@ -41,6 +51,7 @@ public class SeaweedWrite { .setSize(bytesLength) .setMtime(System.currentTimeMillis() / 10000L) .setETag(etag) + .setCipherKey(cipherKeyString) ); } @@ -58,11 +69,22 @@ public class SeaweedWrite { private static String multipartUpload(String targetUrl, String auth, final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { HttpClient client = new DefaultHttpClient(); - InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + InputStream inputStream = null; + if (cipherKey == null) { + inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + } else { + try { + byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey); + inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length); + } catch (Exception e) { + throw new IOException("fail to encrypt data", e); + } + } HttpPost post = new HttpPost(targetUrl); if (auth != null && auth.length() != 0) { @@ -92,4 +114,10 @@ public class SeaweedWrite { } } + + private static byte[] genCipherKey() { + byte[] b = new byte[32]; + random.nextBytes(b); + return b; + } } diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java new file mode 100644 index 000000000..7b5e53e19 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java @@ -0,0 +1,42 @@ +package seaweedfs.client; + +import org.junit.Test; + +import java.util.Base64; + +import static seaweedfs.client.SeaweedCipher.decrypt; +import static seaweedfs.client.SeaweedCipher.encrypt; + +public class SeaweedCipherTest { + + @Test + public void testSameAsGoImplemnetation() throws Exception { + byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + + @Test + public void testEncryptDecrypt() throws Exception { + byte[] secretKey = SeaweedCipher.genCipherKey(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + +} -- cgit v1.2.3 From c4bea45099a3768dae7ea683afa16f2154b01ffb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 20 Mar 2020 14:17:31 -0700 Subject: S3 API: fix DeleteMultipleObjectsHandler fix https://github.com/chrislusf/seaweedfs/issues/1241 --- other/java/client/src/main/proto/filer.proto | 3 --- 1 file changed, 3 deletions(-) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 8df46e917..b998c330c 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -24,9 +24,6 @@ service SeaweedFiler { rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } - rpc StreamDeleteEntries (stream DeleteEntryRequest) returns (stream DeleteEntryResponse) { - } - rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { } -- cgit v1.2.3 From 3d3bab2447c5524e2db67b4e4b4f077d9b387f5a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Mar 2020 23:43:24 -0700 Subject: add notes --- other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 06c1bdd9f..5bf24ef68 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -45,6 +45,8 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); + // TODO: cache fileId ~ bytes here + entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId) .setOffset(offset) -- cgit v1.2.3 From ae2309dc58dbfb231fbf06ed7e16055a4fbc5df9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Mar 2020 23:58:59 -0700 Subject: Update SeaweedRead.java --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index d2717056f..8e850f87e 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -61,6 +61,8 @@ public class SeaweedRead { return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations); } + // TODO: read the chunk and returns the chunk view data here + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); -- cgit v1.2.3 From b656e05aaf71208a7c146839b6432fe2aaaea5a6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 29 Mar 2020 00:55:40 -0700 Subject: HDFS: support chunk cache --- .../src/main/java/seaweedfs/client/ChunkCache.java | 27 +++++++++++++ .../main/java/seaweedfs/client/SeaweedRead.java | 45 ++++++---------------- .../main/java/seaweedfs/client/SeaweedWrite.java | 3 +- 3 files changed, 41 insertions(+), 34 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/ChunkCache.java (limited to 'other/java/client') diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java new file mode 100644 index 000000000..e249d4524 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -0,0 +1,27 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class ChunkCache { + + private final Cache cache; + + public ChunkCache(int maxEntries) { + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(1, TimeUnit.HOURS) + .build(); + } + + public byte[] getChunk(String fileId) { + return this.cache.getIfPresent(fileId); + } + + public void setChunk(String fileId, byte[] data) { + this.cache.put(fileId, data); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 8e850f87e..ad92ba006 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -10,14 +10,14 @@ import org.apache.http.util.EntityUtils; import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.*; public class SeaweedRead { // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + static ChunkCache chunkCache = new ChunkCache(1000); + // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, @@ -57,42 +57,23 @@ public class SeaweedRead { } private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { - if (chunkView.cipherKey != null) { - return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations); - } - - // TODO: read the chunk and returns the chunk view data here - HttpClient client = new DefaultHttpClient(); - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + byte[] chunkData = chunkCache.getChunk(chunkView.fileId); - if (!chunkView.isFullChunk) { - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); + if (chunkData == null) { + chunkData = doFetchFullChunkData(chunkView, locations); } - try { - HttpResponse response = client.execute(request); - HttpEntity entity = response.getEntity(); - - int len = (int) (chunkView.logicOffset - position + chunkView.size); - OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); - entity.writeTo(outputStream); - // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); + int len = (int) (chunkView.logicOffset - position + chunkView.size); + System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); - return len; + chunkCache.setChunk(chunkView.fileId, chunkData); - } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); - } - } + return len; } - private static int readEncryptedChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); @@ -124,9 +105,7 @@ public class SeaweedRead { throw new IOException("fail to decrypt", e); } - int len = (int) (chunkView.logicOffset - position + chunkView.size); - System.arraycopy(data, (int) chunkView.offset, buffer, startOffset, len); - return len; + return data; } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 5bf24ef68..178234d5a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -45,7 +45,8 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - // TODO: cache fileId ~ bytes here + // cache fileId ~ bytes + SeaweedRead.chunkCache.setChunk(fileId, bytes); entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId) -- cgit v1.2.3 From 5ebf232e793e0fd251b2cce9c1e7e311adc356e1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 29 Mar 2020 01:40:04 -0700 Subject: HDFS: 1.2.6 --- other/java/client/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client') diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 945071336..a74c697e2 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.2.5 + 1.2.6 org.sonatype.oss -- cgit v1.2.3 From 50a5018b7fb259cee84471ce8d7bb6e554602c61 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 30 Mar 2020 01:19:33 -0700 Subject: writing meta logs is working --- other/java/client/src/main/proto/filer.proto | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index b998c330c..3975b517d 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -42,6 +42,9 @@ service SeaweedFiler { rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } + rpc ListenForEvents (ListenForEventsRequest) returns (stream FullEventNotification) { + } + } ////////////////////////////////////////////////// @@ -230,3 +233,19 @@ message GetFilerConfigurationResponse { string dir_queues = 6; bool cipher = 7; } + +message ListenForEventsRequest { + string client_name = 1; + string directory = 2; + int64 since_sec = 3; +} +message FullEventNotification { + string directory = 1; + EventNotification event_notification = 2; +} + +message LogEntry { + int64 ts_ns = 1; + int32 partition_key_hash = 2; + bytes data = 3; +} -- cgit v1.2.3 From bf270d9e8c01052409464193b693d50fa09a70a9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Apr 2020 00:51:16 -0700 Subject: filer: able to tail meta data changes --- other/java/client/src/main/proto/filer.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 3975b517d..12f1cd85b 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -237,7 +237,7 @@ message GetFilerConfigurationResponse { message ListenForEventsRequest { string client_name = 1; string directory = 2; - int64 since_sec = 3; + int64 since_ns = 3; } message FullEventNotification { string directory = 1; -- cgit v1.2.3 From 2d43f85577cb998656abf2525445b72fcd8cc48d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Apr 2020 12:51:21 -0700 Subject: watch entries with common path prefix --- other/java/client/src/main/proto/filer.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 12f1cd85b..e504e4f84 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -236,7 +236,7 @@ message GetFilerConfigurationResponse { message ListenForEventsRequest { string client_name = 1; - string directory = 2; + string path_prefix = 2; int64 since_ns = 3; } message FullEventNotification { -- cgit v1.2.3 From ec2eb8bc4804f9b880f256a55e3cbfc0923b6a29 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 8 Apr 2020 08:12:00 -0700 Subject: add If-None-Match and If-Modified-Since fix https://github.com/chrislusf/seaweedfs/issues/1269 --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index e504e4f84..8f8176ff4 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -123,6 +123,7 @@ message FuseAttributes { string user_name = 11; // for hdfs repeated string group_name = 12; // for hdfs string symlink_target = 13; + bytes md5 = 14; } message CreateEntryRequest { -- cgit v1.2.3 From 6f948e48879ed0706b044bea0429d3fc48d6e8e1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 13:07:59 -0700 Subject: remove configurable topics folder location --- other/java/client/src/main/proto/filer.proto | 1 - 1 file changed, 1 deletion(-) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 8f8176ff4..40316f58b 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -231,7 +231,6 @@ message GetFilerConfigurationResponse { string collection = 3; uint32 max_mb = 4; string dir_buckets = 5; - string dir_queues = 6; bool cipher = 7; } -- cgit v1.2.3 From 7764e0465ce976bb528c27bb9aa25857102570ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 21:00:55 -0700 Subject: refactoring --- other/java/client/src/main/proto/filer.proto | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 40316f58b..fd2b8ebe3 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -42,7 +42,7 @@ service SeaweedFiler { rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } - rpc ListenForEvents (ListenForEventsRequest) returns (stream FullEventNotification) { + rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { } } @@ -234,12 +234,12 @@ message GetFilerConfigurationResponse { bool cipher = 7; } -message ListenForEventsRequest { +message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; } -message FullEventNotification { +message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; } -- cgit v1.2.3 From bda82f61bc847d6d02ebd9b242c07e01588b4e30 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 17 Apr 2020 02:28:09 -0700 Subject: filer: able to append to a file --- other/java/client/src/main/proto/filer.proto | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index fd2b8ebe3..bc159fd14 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -21,6 +21,9 @@ service SeaweedFiler { rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) { } + rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) { + } + rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } @@ -143,6 +146,14 @@ message UpdateEntryRequest { message UpdateEntryResponse { } +message AppendToEntryRequest { + string directory = 1; + string entry_name = 2; + repeated FileChunk chunks = 3; +} +message AppendToEntryResponse { +} + message DeleteEntryRequest { string directory = 1; string name = 2; -- cgit v1.2.3 From e24b25de784daf42a15daf573249d608ebc2b44a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 21 Apr 2020 21:16:13 -0700 Subject: async meta caching: can stream updates now --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index bc159fd14..6d890861d 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -253,6 +253,7 @@ message SubscribeMetadataRequest { message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; + int64 ts_ns = 3; } message LogEntry { -- cgit v1.2.3 From b52b8ec68553781408d1ac8c6f7a2cd4d935aea6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 26 Apr 2020 05:21:54 -0700 Subject: Hadoop: fix entry not found for HCFS also fix cipher related changes. --- .../client/src/main/java/seaweedfs/client/FilerClient.java | 9 ++++++--- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 10 ++++++---- .../client/src/main/java/seaweedfs/client/SeaweedWrite.java | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) (limited to 'other/java/client') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 84aa26ad9..ef32c7e9a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -14,7 +14,7 @@ public class FilerClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private FilerGrpcClient filerGrpcClient; + private final FilerGrpcClient filerGrpcClient; public FilerClient(String host, int grpcPort) { filerGrpcClient = new FilerGrpcClient(host, grpcPort); @@ -181,7 +181,7 @@ public class FilerClient { .setLimit(limit) .build()); List entries = new ArrayList<>(); - while (iter.hasNext()){ + while (iter.hasNext()) { FilerProto.ListEntriesResponse resp = iter.next(); entries.add(fixEntryAfterReading(resp.getEntry())); } @@ -195,9 +195,12 @@ public class FilerClient { .setDirectory(directory) .setName(entryName) .build()).getEntry(); + if (entry == null) { + return null; + } return fixEntryAfterReading(entry); } catch (Exception e) { - if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){ + if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { return null; } LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index ad92ba006..1e4a158c6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -99,10 +99,12 @@ public class SeaweedRead { data = Gzip.decompress(data); } - try { - data = SeaweedCipher.decrypt(data, chunkView.cipherKey); - } catch (Exception e) { - throw new IOException("fail to decrypt", e); + if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); + } } return data; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 178234d5a..dc6203e52 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -36,7 +36,7 @@ public class SeaweedWrite { String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - ByteString cipherKeyString = null; + ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; if (filerGrpcClient.isCipher()) { cipherKey = genCipherKey(); @@ -78,7 +78,7 @@ public class SeaweedWrite { HttpClient client = new DefaultHttpClient(); InputStream inputStream = null; - if (cipherKey == null) { + if (cipherKey == null || cipherKey.length == 0) { inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); } else { try { -- cgit v1.2.3 From 972f437225ca9eda119c29868bf44a21d98e3996 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 26 Apr 2020 05:44:56 -0700 Subject: HCFS 1.2.7 --- other/java/client/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client') diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index a74c697e2..bea129db7 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.2.6 + 1.2.7 org.sonatype.oss -- cgit v1.2.3 From 1e3e4b3072071341b4bb4b0bb7c611457e927f97 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 5 May 2020 02:05:28 -0700 Subject: add broker connects to filer --- other/java/client/src/main/proto/filer.proto | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6d890861d..3b3b78bbb 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -48,6 +48,8 @@ service SeaweedFiler { rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { } + rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { + } } ////////////////////////////////////////////////// @@ -261,3 +263,10 @@ message LogEntry { int32 partition_key_hash = 2; bytes data = 3; } + +message KeepConnectedRequest { + string name = 1; + uint32 grpc_port = 2; +} +message KeepConnectedResponse { +} -- cgit v1.2.3 From dfccc3c2637693dce141c27a321ba5d3aea1ace9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 8 May 2020 02:47:22 -0700 Subject: able to read chan and write chan --- other/java/client/src/main/proto/filer.proto | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'other/java/client') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 3b3b78bbb..1fc8ef63d 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -50,6 +50,10 @@ service SeaweedFiler { rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { } + + rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { + } + } ////////////////////////////////////////////////// @@ -267,6 +271,21 @@ message LogEntry { message KeepConnectedRequest { string name = 1; uint32 grpc_port = 2; + repeated string resources = 3; } message KeepConnectedResponse { } + +message LocateBrokerRequest { + string resource = 1; +} +message LocateBrokerResponse { + bool found = 1; + // if found, send the exact address + // if not found, send the full list of existing brokers + message Resource { + string grpc_addresses = 1; + int32 resource_count = 2; + } + repeated Resource resources = 2; +} -- cgit v1.2.3 From 8dfaaeabfd97efff0e17af5cdf5b2d647bdb9b88 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 May 2020 17:07:34 -0700 Subject: HCFS: 1.2.8 fix hbase related bugs 1. SeaweedFileSystem.listStatus need to work with file also 2. SeaweedRead readChunkView has wrong len --- other/java/client/pom.xml | 2 +- other/java/client/pom_debug.xml | 139 +++++++++++++++++++++ .../main/java/seaweedfs/client/SeaweedRead.java | 8 +- 3 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 other/java/client/pom_debug.xml (limited to 'other/java/client') diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index bea129db7..a8b561251 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.2.7 + 1.2.8 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml new file mode 100644 index 000000000..88447f7e7 --- /dev/null +++ b/other/java/client/pom_debug.xml @@ -0,0 +1,139 @@ + + + 4.0.0 + + com.github.chrislusf + seaweedfs-client + 1.2.8 + + + org.sonatype.oss + oss-parent + 9 + + + + 3.9.1 + + 1.23.0 + 28.0-jre + + + + + com.moandjiezana.toml + toml4j + 0.7.2 + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + com.google.guava + guava + ${guava.version} + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + org.slf4j + slf4j-api + 1.7.25 + + + org.apache.httpcomponents + httpmime + 4.5.6 + + + junit + junit + 4.12 + test + + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.9.1 + + + attach-javadocs + + jar + + + + + + + + diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 1e4a158c6..7be39da53 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -7,6 +7,8 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -14,7 +16,7 @@ import java.util.*; public class SeaweedRead { - // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); static ChunkCache chunkCache = new ChunkCache(1000); @@ -64,7 +66,9 @@ public class SeaweedRead { chunkData = doFetchFullChunkData(chunkView, locations); } - int len = (int) (chunkView.logicOffset - position + chunkView.size); + int len = (int) chunkView.size; + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", + chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); chunkCache.setChunk(chunkView.fileId, chunkData); -- cgit v1.2.3