From 74fb237727267aa482ee07851f454ca03fbd1fdf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 15 Feb 2019 00:09:19 -0800 Subject: benchmark can work in secure mode --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6cd4df6b4..5cdcb6a97 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -139,6 +139,7 @@ message AssignVolumeResponse { string url = 2; string public_url = 3; int32 count = 4; + string auth = 5; } message LookupVolumeRequest { -- cgit v1.2.3 From 58d4088db4bb5a5d56f695d67259ad02755f1fe5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 19 Feb 2019 11:57:25 -0800 Subject: HDFS: add tls secured grpc --- .../java/seaweedfs/client/FilerGrpcClient.java | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 16b7c3249..c28c1dcf2 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -2,7 +2,14 @@ package seaweedfs.client; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import javax.net.ssl.SSLException; +import java.io.File; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -20,6 +27,16 @@ public class FilerGrpcClient { this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()); } + public FilerGrpcClient(String host, int grpcPort, + String caFilePath, + String clientCertFilePath, + String clientPrivateKeyFilePath) throws SSLException { + + this(NettyChannelBuilder.forAddress(host, grpcPort) + .negotiationType(NegotiationType.TLS) + .sslContext(buildSslContext(caFilePath,clientCertFilePath,clientPrivateKeyFilePath))); + } + public FilerGrpcClient(ManagedChannelBuilder channelBuilder) { channel = channelBuilder.build(); blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); @@ -42,4 +59,18 @@ public class FilerGrpcClient { public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { return futureStub; } + + private static SslContext buildSslContext(String trustCertCollectionFilePath, + String clientCertChainFilePath, + String clientPrivateKeyFilePath) throws SSLException { + SslContextBuilder builder = GrpcSslContexts.forClient(); + if (trustCertCollectionFilePath != null) { + builder.trustManager(new File(trustCertCollectionFilePath)); + } + if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) { + builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath)); + } + return builder.build(); + } + } -- cgit v1.2.3 From 6ed69de6bd5dcabc6fa70185bfcb772786b27517 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 19 Feb 2019 14:26:59 -0800 Subject: HDFS: add jwt auth for uploading --- .../main/java/seaweedfs/client/SeaweedWrite.java | 45 ++++++++++++---------- 1 file changed, 25 insertions(+), 20 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index a7cede09f..15db87195 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -20,25 +20,26 @@ public class SeaweedWrite { final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( - FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) - .setDataCenter("") - .setReplication("") - .setTtlSec(0) - .build()); + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection("") + .setReplication(replication) + .setDataCenter("") + .setReplication("") + .setTtlSec(0) + .build()); String fileId = response.getFileId(); String url = response.getUrl(); + String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength); + String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength); entry.addChunks(FilerProto.FileChunk.newBuilder() - .setFileId(fileId) - .setOffset(offset) - .setSize(bytesLength) - .setMtime(System.currentTimeMillis() / 10000L) - .setETag(etag) + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) ); } @@ -46,14 +47,15 @@ public class SeaweedWrite { public static void writeMeta(final FilerGrpcClient filerGrpcClient, final String parentDirectory, final FilerProto.Entry.Builder entry) { filerGrpcClient.getBlockingStub().createEntry( - FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parentDirectory) - .setEntry(entry) - .build() + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parentDirectory) + .setEntry(entry) + .build() ); } private static String multipartUpload(String targetUrl, + String auth, final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { @@ -62,11 +64,14 @@ public class SeaweedWrite { InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); HttpPost post = new HttpPost(targetUrl); + if (auth != null && auth.length() != 0) { + post.addHeader("Authorization", "BEARER " + auth); + } post.setEntity(MultipartEntityBuilder.create() - .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) - .addBinaryBody("upload", inputStream) - .build()); + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("upload", inputStream) + .build()); try { HttpResponse response = client.execute(post); -- cgit v1.2.3 From 97406333a5ecc5b0d2cdaa74ff9901e3100e4bf2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 30 Mar 2019 23:08:29 -0700 Subject: support atomic renaming for mysql/postgres filer store --- .../main/java/seaweedfs/client/FilerClient.java | 141 +++++++++++++-------- other/java/client/src/main/proto/filer.proto | 13 ++ 2 files changed, 98 insertions(+), 56 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 63d0d8320..562a36894 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -51,12 +51,26 @@ public class FilerClient { } return createEntry( - parent, - newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build() + parent, + newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build() ); } + public boolean mv(String oldPath, String newPath) { + + Path oldPathObject = Paths.get(oldPath); + String oldParent = oldPathObject.getParent().toString(); + String oldName = oldPathObject.getFileName().toString(); + + Path newPathObject = Paths.get(newPath); + String newParent = newPathObject.getParent().toString(); + String newName = newPathObject.getFileName().toString(); + + return atomicRenameEntry(oldParent, oldName, newParent, newName); + + } + public boolean rm(String path, boolean isRecursive) { Path pathObject = Paths.get(path); @@ -64,10 +78,10 @@ public class FilerClient { String name = pathObject.getFileName().toString(); return deleteEntry( - parent, - name, - true, - isRecursive); + parent, + name, + true, + isRecursive); } public boolean touch(String path, int mode) { @@ -84,18 +98,18 @@ public class FilerClient { FilerProto.Entry entry = lookupEntry(parent, name); if (entry == null) { return createEntry( - parent, - newFileEntry(name, mode, uid, gid, userName, groupNames).build() + parent, + newFileEntry(name, mode, uid, gid, userName, groupNames).build() ); } long now = System.currentTimeMillis() / 1000L; FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder() - .setMtime(now) - .setUid(uid) - .setGid(gid) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames)); + .setMtime(now) + .setUid(uid) + .setGid(gid) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames)); return updateEntry(parent, entry.toBuilder().setAttributes(attr).build()); } @@ -105,17 +119,17 @@ public class FilerClient { long now = System.currentTimeMillis() / 1000L; return FilerProto.Entry.newBuilder() - .setName(name) - .setIsDirectory(true) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setMtime(now) - .setCrtime(now) - .setUid(uid) - .setGid(gid) - .setFileMode(mode | 1 << 31) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames))); + .setName(name) + .setIsDirectory(true) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode | 1 << 31) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); } public FilerProto.Entry.Builder newFileEntry(String name, int mode, @@ -124,17 +138,17 @@ public class FilerClient { long now = System.currentTimeMillis() / 1000L; return FilerProto.Entry.newBuilder() - .setName(name) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setMtime(now) - .setCrtime(now) - .setUid(uid) - .setGid(gid) - .setFileMode(mode) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames))); + .setName(name) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); } public List listEntries(String path) { @@ -160,20 +174,20 @@ public class FilerClient { public List listEntries(String path, String entryPrefix, String lastEntryName, int limit) { return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() - .setDirectory(path) - .setPrefix(entryPrefix) - .setStartFromFileName(lastEntryName) - .setLimit(limit) - .build()).getEntriesList(); + .setDirectory(path) + .setPrefix(entryPrefix) + .setStartFromFileName(lastEntryName) + .setLimit(limit) + .build()).getEntriesList(); } public FilerProto.Entry lookupEntry(String directory, String entryName) { try { return filerGrpcClient.getBlockingStub().lookupDirectoryEntry( - FilerProto.LookupDirectoryEntryRequest.newBuilder() - .setDirectory(directory) - .setName(entryName) - .build()).getEntry(); + FilerProto.LookupDirectoryEntryRequest.newBuilder() + .setDirectory(directory) + .setName(entryName) + .build()).getEntry(); } catch (Exception e) { LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); return null; @@ -184,9 +198,9 @@ public class FilerClient { public boolean createEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; @@ -197,9 +211,9 @@ public class FilerClient { public boolean updateEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; @@ -210,11 +224,11 @@ public class FilerClient { public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) { try { filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() - .setDirectory(parent) - .setName(entryName) - .setIsDeleteData(isDeleteFileChunk) - .setIsRecursive(isRecursive) - .build()); + .setDirectory(parent) + .setName(entryName) + .setIsDeleteData(isDeleteFileChunk) + .setIsRecursive(isRecursive) + .build()); } catch (Exception e) { LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e); return false; @@ -222,4 +236,19 @@ public class FilerClient { return true; } + public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { + try { + filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() + .setOldDirectory(oldParent) + .setOldName(oldName) + .setNewDirectory(newParent) + .setNewName(newName) + .build()); + } catch (Exception e) { + LOG.warn("atomicRenameEntry {}/{} => {}/{}: {}", oldParent, oldName, newParent, newName, e); + return false; + } + return true; + } + } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 5cdcb6a97..07c73f1d4 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -24,6 +24,9 @@ service SeaweedFiler { rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } + rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { + } + rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) { } @@ -126,6 +129,16 @@ message DeleteEntryRequest { message DeleteEntryResponse { } +message AtomicRenameEntryRequest { + string old_directory = 1; + string old_name = 2; + string new_directory = 3; + string new_name = 4; +} + +message AtomicRenameEntryResponse { +} + message AssignVolumeRequest { int32 count = 1; string collection = 2; -- cgit v1.2.3 From b3b42bc947ec44acdc69efdeebb623a0c092078a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 16 Apr 2019 00:44:31 -0700 Subject: replicate need to include new entry path --- other/java/client/src/main/proto/filer.proto | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 07c73f1d4..350288b53 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -72,10 +72,16 @@ message Entry { map extended = 5; } +message FullEntry { + string dir = 1; + Entry entry = 2; +} + message EventNotification { Entry old_entry = 1; Entry new_entry = 2; bool delete_chunks = 3; + string new_parent_path = 4; } message FileChunk { -- cgit v1.2.3 From 82b0759493470417c0eb7135ed4a9b0e530fd9a3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 17 May 2019 02:03:23 -0700 Subject: filer: migrating filer store from persisting shorter structured file id instead of a string --- .../main/java/seaweedfs/client/FilerClient.java | 30 ++++++++++++++++++++-- other/java/client/src/main/proto/filer.proto | 12 +++++++-- 2 files changed, 38 insertions(+), 4 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 562a36894..f4bd0944b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -173,21 +173,27 @@ public class FilerClient { } public List listEntries(String path, String entryPrefix, String lastEntryName, int limit) { - return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + List entries = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) .setLimit(limit) .build()).getEntriesList(); + List fixedEntries = new ArrayList<>(entries.size()); + for (FilerProto.Entry entry : entries) { + fixedEntries.add(fixEntryAfterReading(entry)); + } + return fixedEntries; } public FilerProto.Entry lookupEntry(String directory, String entryName) { try { - return filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( FilerProto.LookupDirectoryEntryRequest.newBuilder() .setDirectory(directory) .setName(entryName) .build()).getEntry(); + return fixEntryAfterReading(entry); } catch (Exception e) { LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); return null; @@ -251,4 +257,24 @@ public class FilerClient { return true; } + private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) { + if (entry.getChunksList().size() <= 0) { + return entry; + } + String fileId = entry.getChunks(0).getFileId(); + if (fileId != null && fileId.length() != 0) { + return entry; + } + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + entryBuilder.clearChunks(); + for (FilerProto.FileChunk chunk : entry.getChunksList()) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + FilerProto.FileId fid = chunk.getFid(); + fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + chunkBuilder.setFileId(fileId); + entryBuilder.addChunks(chunkBuilder); + } + return entryBuilder.build(); + } + } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 350288b53..56814c39a 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -85,12 +85,20 @@ message EventNotification { } message FileChunk { - string file_id = 1; + string file_id = 1; // to be deprecated int64 offset = 2; uint64 size = 3; int64 mtime = 4; string e_tag = 5; - string source_file_id = 6; + string source_file_id = 6; // to be deprecated + FileId fid = 7; + FileId source_fid = 8; +} + +message FileId { + uint32 volume_id = 1; + uint64 file_key = 2; + fixed32 cookie = 3; } message FuseAttributes { -- cgit v1.2.3 From 8da5d5b094f898c0b9c75cbccb63e0b0a125802e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 23 Jun 2019 01:57:35 -0700 Subject: filer.copy: use filer settings, avoid unnecessary command line options fix https://github.com/chrislusf/seaweedfs/issues/968 --- other/java/client/src/main/proto/filer.proto | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 56814c39a..d72bced12 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -39,6 +39,9 @@ service SeaweedFiler { rpc Statistics (StatisticsRequest) returns (StatisticsResponse) { } + rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { + } + } ////////////////////////////////////////////////// @@ -205,3 +208,12 @@ message StatisticsResponse { uint64 used_size = 5; uint64 file_count = 6; } + +message GetFilerConfigurationRequest { +} +message GetFilerConfigurationResponse { + repeated string masters = 1; + string replication = 2; + string collection = 3; + uint32 max_mb = 4; +} -- cgit v1.2.3 From 170ee6ef0f9a94504580db5fa8c82e4ef6d50a99 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 29 Aug 2019 23:29:10 -0700 Subject: tmp --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index a906a689b..a307983bb 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -86,7 +86,7 @@ public class SeaweedRead { return 0; } - public static List viewFromVisibles(List visibleIntervals, long offset, long size) { + protected static List viewFromVisibles(List visibleIntervals, long offset, long size) { List views = new ArrayList<>(); long stop = offset + size; -- cgit v1.2.3 From cb299dfaa279e14def8bf3f26816913213a91097 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 1 Sep 2019 03:46:51 -0700 Subject: HCFS: use latest grpc versions, separate hadoop2 and hadoop3 --- .../java/seaweedfs/client/FilerGrpcClient.java | 46 +++++++--------- .../java/seaweedfs/client/FilerSslContext.java | 64 ++++++++++++++++++++++ .../java/seaweedfs/client/SeaweedFilerTest.java | 17 ++++++ 3 files changed, 101 insertions(+), 26 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java create mode 100644 other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index c28c1dcf2..3626c76de 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -2,39 +2,46 @@ package seaweedfs.client; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; -import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; -import java.io.File; import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; public class FilerGrpcClient { - private static final Logger logger = Logger.getLogger(FilerGrpcClient.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + static SslContext sslContext; + + static { + try { + sslContext = FilerSslContext.loadSslContext(); + } catch (SSLException e) { + logger.warn("failed to load ssl context", e); + } + } public FilerGrpcClient(String host, int grpcPort) { - this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()); + this(host, grpcPort, sslContext); } - public FilerGrpcClient(String host, int grpcPort, - String caFilePath, - String clientCertFilePath, - String clientPrivateKeyFilePath) throws SSLException { + public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) { + + this(sslContext == null ? + ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() : + NettyChannelBuilder.forAddress(host, grpcPort) + .negotiationType(NegotiationType.TLS) + .sslContext(sslContext)); - this(NettyChannelBuilder.forAddress(host, grpcPort) - .negotiationType(NegotiationType.TLS) - .sslContext(buildSslContext(caFilePath,clientCertFilePath,clientPrivateKeyFilePath))); } public FilerGrpcClient(ManagedChannelBuilder channelBuilder) { @@ -60,17 +67,4 @@ public class FilerGrpcClient { return futureStub; } - private static SslContext buildSslContext(String trustCertCollectionFilePath, - String clientCertChainFilePath, - String clientPrivateKeyFilePath) throws SSLException { - SslContextBuilder builder = GrpcSslContexts.forClient(); - if (trustCertCollectionFilePath != null) { - builder.trustManager(new File(trustCertCollectionFilePath)); - } - if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) { - builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath)); - } - return builder.build(); - } - } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java b/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java new file mode 100644 index 000000000..5a88c1da3 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java @@ -0,0 +1,64 @@ +package seaweedfs.client; + +import com.google.common.base.Strings; +import com.moandjiezana.toml.Toml; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; +import java.io.File; + +public class FilerSslContext { + + private static final Logger logger = LoggerFactory.getLogger(FilerSslContext.class); + + public static SslContext loadSslContext() throws SSLException { + String securityFileName = "security.toml"; + String home = System.getProperty("user.home"); + File f1 = new File("./"+securityFileName); + File f2 = new File(home + "/.seaweedfs/"+securityFileName); + File f3 = new File(home + "/etc/seaweedfs/"+securityFileName); + + File securityFile = f1.exists()? f1 : f2.exists() ? f2 : f3.exists()? f3 : null; + + if (securityFile==null){ + return null; + } + + Toml toml = new Toml().read(securityFile); + logger.debug("reading ssl setup from {}", securityFile); + + String trustCertCollectionFilePath = toml.getString("grpc.ca"); + logger.debug("loading ca from {}", trustCertCollectionFilePath); + String clientCertChainFilePath = toml.getString("grpc.client.cert"); + logger.debug("loading client ca from {}", clientCertChainFilePath); + String clientPrivateKeyFilePath = toml.getString("grpc.client.key"); + logger.debug("loading client key from {}", clientPrivateKeyFilePath); + + if (Strings.isNullOrEmpty(clientPrivateKeyFilePath) && Strings.isNullOrEmpty(clientPrivateKeyFilePath)){ + return null; + } + + // possibly fix the format https://netty.io/wiki/sslcontextbuilder-and-private-key.html + + return buildSslContext(trustCertCollectionFilePath, clientCertChainFilePath, clientPrivateKeyFilePath); + } + + + private static SslContext buildSslContext(String trustCertCollectionFilePath, + String clientCertChainFilePath, + String clientPrivateKeyFilePath) throws SSLException { + SslContextBuilder builder = GrpcSslContexts.forClient(); + if (trustCertCollectionFilePath != null) { + builder.trustManager(new File(trustCertCollectionFilePath)); + } + if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) { + builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath)); + } + return builder.trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + } +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java new file mode 100644 index 000000000..dde23ee87 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java @@ -0,0 +1,17 @@ +package seaweedfs.client; + +import java.util.List; + +public class SeaweedFilerTest { + public static void main(String[] args){ + + FilerClient filerClient = new FilerClient("localhost", 18888); + + List entries = filerClient.listEntries("/"); + + for (FilerProto.Entry entry : entries) { + System.out.println(entry.toString()); + } + + } +} -- cgit v1.2.3 From af8b413a9cebea0109ebdf16bd18078da3c551a1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 1 Sep 2019 12:25:06 -0700 Subject: HCFS: fix mkdirs NPE --- other/java/client/src/main/java/seaweedfs/client/FilerClient.java | 7 +++---- .../client/src/test/java/seaweedfs/client/SeaweedFilerTest.java | 6 ++++++ 2 files changed, 9 insertions(+), 4 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index f4bd0944b..2a40f3200 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -34,13 +34,12 @@ public class FilerClient { public boolean mkdirs(String path, int mode, int uid, int gid, String userName, String[] groupNames) { - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); - if ("/".equals(path)) { return true; } + Path pathObject = Paths.get(path); + String parent = pathObject.getParent().toString(); + String name = pathObject.getFileName().toString(); mkdirs(parent, mode, uid, gid, userName, groupNames); diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java index dde23ee87..87165af0c 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java @@ -13,5 +13,11 @@ public class SeaweedFilerTest { System.out.println(entry.toString()); } + filerClient.mkdirs("/new_folder", 0755); + filerClient.touch("/new_folder/new_empty_file", 0755); + filerClient.touch("/new_folder/new_empty_file2", 0755); + filerClient.rm("/new_folder/new_empty_file", false); + filerClient.rm("/new_folder", true); + } } -- cgit v1.2.3 From 60c9215a00d9b72be6f7d0e5382227d2b57de9d5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 1 Sep 2019 21:40:26 -0700 Subject: HCFS: avoid verbose file not found exception stack trace --- other/java/client/src/main/java/seaweedfs/client/FilerClient.java | 3 +++ 1 file changed, 3 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 2a40f3200..1ea4cb889 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -194,6 +194,9 @@ public class FilerClient { .build()).getEntry(); return fixEntryAfterReading(entry); } catch (Exception e) { + if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){ + return null; + } LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); return null; } -- cgit v1.2.3 From ae53f636804e41c2c7a0817e8f35434a00b6eacb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 11 Sep 2019 20:26:20 -0700 Subject: filer: recursive deletion optionally ignoring any errors fix https://github.com/chrislusf/seaweedfs/issues/1062 --- other/java/client/src/main/java/seaweedfs/client/FilerClient.java | 8 +++++--- other/java/client/src/main/proto/filer.proto | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 1ea4cb889..a1e3cdb89 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -70,7 +70,7 @@ public class FilerClient { } - public boolean rm(String path, boolean isRecursive) { + public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) { Path pathObject = Paths.get(path); String parent = pathObject.getParent().toString(); @@ -80,7 +80,8 @@ public class FilerClient { parent, name, true, - isRecursive); + isRecursive, + ignoreRecusiveError); } public boolean touch(String path, int mode) { @@ -229,13 +230,14 @@ public class FilerClient { return true; } - public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) { + public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) { try { filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() .setDirectory(parent) .setName(entryName) .setIsDeleteData(isDeleteFileChunk) .setIsRecursive(isRecursive) + .setIgnoreRecursiveError(ignoreRecusiveError) .build()); } catch (Exception e) { LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e); diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index d72bced12..18ccca44f 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -141,6 +141,7 @@ message DeleteEntryRequest { // bool is_directory = 3; bool is_delete_data = 4; bool is_recursive = 5; + bool ignore_recursive_error = 6; } message DeleteEntryResponse { -- cgit v1.2.3 From a999ed94d0abe53ac444c6a12cf0f5f89ea06d33 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 8 Oct 2019 22:49:10 -0700 Subject: update hdfs client --- .../java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java index 87165af0c..eaf17e5c6 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java @@ -16,8 +16,8 @@ public class SeaweedFilerTest { filerClient.mkdirs("/new_folder", 0755); filerClient.touch("/new_folder/new_empty_file", 0755); filerClient.touch("/new_folder/new_empty_file2", 0755); - filerClient.rm("/new_folder/new_empty_file", false); - filerClient.rm("/new_folder", true); + filerClient.rm("/new_folder/new_empty_file", false, true); + filerClient.rm("/new_folder", true, true); } } -- cgit v1.2.3 From 15f968b0546cdf9d5ee6215ff967de0b3ea4b552 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 9 Oct 2019 18:09:08 -0700 Subject: HCFS: avoid possible jar problem --- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index a307983bb..de8da28d7 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -5,16 +5,12 @@ import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.DefaultHttpClient; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; +import java.util.*; public class SeaweedRead { @@ -59,7 +55,7 @@ public class SeaweedRead { } private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) { - HttpClient client = HttpClientBuilder.create().build(); + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); -- cgit v1.2.3 From 7057e7a076b109e5d3e9140b27fa4e8beabcbc71 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 11 Oct 2019 21:44:44 -0700 Subject: HCFS: 1.22 fix put gives java.lang.NoSuchFieldError: INSTANCE related to Cloudera CDH Hadoop https://github.com/chrislusf/seaweedfs/issues/1080 --- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 13 ++++++++----- .../client/src/main/java/seaweedfs/client/SeaweedWrite.java | 12 ++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index de8da28d7..2efa64580 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -7,6 +7,7 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -19,7 +20,7 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) { + final int bufferLength) throws IOException { List chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); @@ -54,7 +55,7 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) { + private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); @@ -76,10 +77,12 @@ public class SeaweedRead { return len; - } catch (IOException e) { - e.printStackTrace(); + } finally { + if (client instanceof Closeable) { + Closeable t = (Closeable) client; + t.close(); + } } - return 0; } protected static List viewFromVisibles(List visibleIntervals, long offset, long size) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 15db87195..0663e8d98 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,13 +1,14 @@ package seaweedfs.client; import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.DefaultHttpClient; import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -59,7 +60,7 @@ public class SeaweedWrite { final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { - CloseableHttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build(); + HttpClient client = new DefaultHttpClient(); InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); @@ -84,7 +85,10 @@ public class SeaweedWrite { return etag; } finally { - client.close(); + if (client instanceof Closeable) { + Closeable t = (Closeable) client; + t.close(); + } } } -- cgit v1.2.3 From 0fa1269bc77abe30f4d108a88a97e29e1bca3124 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Dec 2019 00:22:37 -0800 Subject: filer: streaming file listing --- other/java/client/src/main/proto/filer.proto | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 18ccca44f..ef847cbe7 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -12,7 +12,7 @@ service SeaweedFiler { rpc LookupDirectoryEntry (LookupDirectoryEntryRequest) returns (LookupDirectoryEntryResponse) { } - rpc ListEntries (ListEntriesRequest) returns (ListEntriesResponse) { + rpc ListEntries (ListEntriesRequest) returns (stream ListEntriesResponse) { } rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) { @@ -64,7 +64,7 @@ message ListEntriesRequest { } message ListEntriesResponse { - repeated Entry entries = 1; + Entry entry = 1; } message Entry { -- cgit v1.2.3 From 987108a2b16e61595d0c7768ca47937d0652d5c4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Dec 2019 10:04:53 -0800 Subject: HCFS: streaming list files --- .../client/src/main/java/seaweedfs/client/FilerClient.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index a1e3cdb89..84aa26ad9 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -7,6 +7,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; public class FilerClient { @@ -173,17 +174,18 @@ public class FilerClient { } public List listEntries(String path, String entryPrefix, String lastEntryName, int limit) { - List entries = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + Iterator iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) .setLimit(limit) - .build()).getEntriesList(); - List fixedEntries = new ArrayList<>(entries.size()); - for (FilerProto.Entry entry : entries) { - fixedEntries.add(fixEntryAfterReading(entry)); + .build()); + List entries = new ArrayList<>(); + while (iter.hasNext()){ + FilerProto.ListEntriesResponse resp = iter.next(); + entries.add(fixEntryAfterReading(resp.getEntry())); } - return fixedEntries; + return entries; } public FilerProto.Entry lookupEntry(String directory, String entryName) { -- cgit v1.2.3 From d4e75a0d183b57180b2ff0be2531db540c0c9aa6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 22 Jan 2020 11:42:40 -0800 Subject: filer: option to create file only if it is new, O_EXCL --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index ef847cbe7..41c1650d4 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -123,6 +123,7 @@ message FuseAttributes { message CreateEntryRequest { string directory = 1; Entry entry = 2; + bool o_excl = 3; } message CreateEntryResponse { -- cgit v1.2.3 From 6a5c0370995653621fa8b576ea149e91875938d6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 22 Jan 2020 22:59:12 -0800 Subject: fix http range requests --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 2efa64580..b08c14467 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -63,7 +63,7 @@ public class SeaweedRead { if (!chunkView.isFullChunk) { request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size)); + String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); } try { -- cgit v1.2.3 From c48fc8b4de5922c44d22da306699f789353ecdd4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 25 Jan 2020 09:17:19 -0800 Subject: grpc send error via response instead of grpc error --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 41c1650d4..6357d971f 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -127,6 +127,7 @@ message CreateEntryRequest { } message CreateEntryResponse { + string error = 1; } message UpdateEntryRequest { -- cgit v1.2.3 From d8dec2323bde1a5ab787b719a240969852004456 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 24 Feb 2020 14:34:14 -0800 Subject: s3: move buckets folder configuration to filer --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6357d971f..909458daf 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -219,4 +219,5 @@ message GetFilerConfigurationResponse { string replication = 2; string collection = 3; uint32 max_mb = 4; + string dir_buckets = 5; } -- cgit v1.2.3 From 6ab7368ef2556ef086d13c6d0d4454f1e98a5cd8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 24 Feb 2020 22:28:45 -0800 Subject: filer: dynamically create bucket under /buckets folder --- other/java/client/src/main/proto/filer.proto | 3 +++ 1 file changed, 3 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 909458daf..6892effe8 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -165,6 +165,7 @@ message AssignVolumeRequest { string replication = 3; int32 ttl_sec = 4; string data_center = 5; + string parent_path = 6; } message AssignVolumeResponse { @@ -173,6 +174,8 @@ message AssignVolumeResponse { string public_url = 3; int32 count = 4; string auth = 5; + string collection = 6; + string replication = 7; } message LookupVolumeRequest { -- cgit v1.2.3 From bc38b72a20bd79bf67ee1770e20dcd538285cedf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 14:38:36 -0800 Subject: s3: implemented DeleteMultipleObjects --- other/java/client/src/main/proto/filer.proto | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6892effe8..d26c5595f 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -24,6 +24,9 @@ service SeaweedFiler { rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } + rpc StreamDeleteEntries (stream DeleteEntryRequest) returns (stream DeleteEntryResponse) { + } + rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { } @@ -147,6 +150,7 @@ message DeleteEntryRequest { } message DeleteEntryResponse { + string error = 1; } message AtomicRenameEntryRequest { -- cgit v1.2.3 From 0841bedb150fb7d4a96c237961474310942c2454 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 17:15:09 -0800 Subject: move filer assign volume grpc errror to response --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index d26c5595f..04901770a 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -180,6 +180,7 @@ message AssignVolumeResponse { string auth = 5; string collection = 6; string replication = 7; + string error = 8; } message LookupVolumeRequest { -- cgit v1.2.3 From 555413d9fc4837302ef1c5b2b921b406c9de6777 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 27 Feb 2020 00:07:13 -0800 Subject: weed queue starts --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 04901770a..9ee552561 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -228,4 +228,5 @@ message GetFilerConfigurationResponse { string collection = 3; uint32 max_mb = 4; string dir_buckets = 5; + string dir_queues = 6; } -- cgit v1.2.3 From 13e215ee5cb5f4c2873f89c263d8c970e9978b19 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2020 00:49:47 -0800 Subject: filer: option to encrypt data on volume server --- other/java/client/src/main/proto/filer.proto | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 9ee552561..5983c84d8 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -99,6 +99,7 @@ message FileChunk { string source_file_id = 6; // to be deprecated FileId fid = 7; FileId source_fid = 8; + bytes cipher_key = 9; } message FileId { @@ -229,4 +230,5 @@ message GetFilerConfigurationResponse { uint32 max_mb = 4; string dir_buckets = 5; string dir_queues = 6; + bool cipher = 7; } -- cgit v1.2.3 From 2e3f6ad3a97bc7fad349e63289695547f92c1f8b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2020 21:39:33 -0700 Subject: filer: remember content is gzipped or not --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 5983c84d8..8df46e917 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -100,6 +100,7 @@ message FileChunk { FileId fid = 7; FileId source_fid = 8; bytes cipher_key = 9; + bool is_gzipped = 10; } message FileId { -- cgit v1.2.3 From de1ba85346bd2af41ebb98f152bb769e2b630139 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 14 Mar 2020 00:27:57 -0700 Subject: HDFS support encrypted data storage --- .../java/seaweedfs/client/FilerGrpcClient.java | 33 ++++- .../src/main/java/seaweedfs/client/Gzip.java | 37 ++++++ .../main/java/seaweedfs/client/SeaweedCipher.java | 55 +++++++++ .../main/java/seaweedfs/client/SeaweedRead.java | 137 +++++++++++++++------ .../main/java/seaweedfs/client/SeaweedWrite.java | 40 +++++- .../java/seaweedfs/client/SeaweedCipherTest.java | 42 +++++++ 6 files changed, 295 insertions(+), 49 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/Gzip.java create mode 100644 other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java create mode 100644 other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 3626c76de..3f5d1e8e9 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -14,12 +14,6 @@ import java.util.concurrent.TimeUnit; public class FilerGrpcClient { private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); - - private final ManagedChannel channel; - private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; - private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; - private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; - static SslContext sslContext; static { @@ -30,6 +24,14 @@ public class FilerGrpcClient { } } + private final ManagedChannel channel; + private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; + private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; + private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + private boolean cipher = false; + private String collection = ""; + private String replication = ""; + public FilerGrpcClient(String host, int grpcPort) { this(host, grpcPort, sslContext); } @@ -42,6 +44,13 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = + this.getBlockingStub().getFilerConfiguration( + FilerProto.GetFilerConfigurationRequest.newBuilder().build()); + cipher = filerConfigurationResponse.getCipher(); + collection = filerConfigurationResponse.getCollection(); + replication = filerConfigurationResponse.getReplication(); + } public FilerGrpcClient(ManagedChannelBuilder channelBuilder) { @@ -51,6 +60,18 @@ public class FilerGrpcClient { futureStub = SeaweedFilerGrpc.newFutureStub(channel); } + public boolean isCipher() { + return cipher; + } + + public String getCollection() { + return collection; + } + + public String getReplication() { + return replication; + } + public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java new file mode 100644 index 000000000..248285dd3 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -0,0 +1,37 @@ +package seaweedfs.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class Gzip { + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length); + GZIPOutputStream gzip = new GZIPOutputStream(bos); + gzip.write(data); + gzip.close(); + byte[] compressed = bos.toByteArray(); + bos.close(); + return compressed; + } + + public static byte[] decompress(byte[] compressed) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } + + private static byte[] readAll(InputStream input) throws IOException { + try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + byte[] buffer = new byte[4096]; + int n; + while (-1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + } + return output.toByteArray(); + } + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java new file mode 100644 index 000000000..8d0ebd755 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java @@ -0,0 +1,55 @@ +package seaweedfs.client; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.security.SecureRandom; + +public class SeaweedCipher { + // AES-GCM parameters + public static final int AES_KEY_SIZE = 256; // in bits + public static final int GCM_NONCE_LENGTH = 12; // in bytes + public static final int GCM_TAG_LENGTH = 16; // in bytes + + private static SecureRandom random = new SecureRandom(); + + public static byte[] genCipherKey() throws Exception { + byte[] key = new byte[AES_KEY_SIZE / 8]; + random.nextBytes(key); + return key; + } + + public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception { + return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey); + } + + public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception { + + final byte[] nonce = new byte[GCM_NONCE_LENGTH]; + random.nextBytes(nonce); + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + + Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec); + + byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length); + + byte[] iv = AES_cipherInstance.getIV(); + byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH]; + System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length); + + return message; + } + + public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception { + final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params); + byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH); + return decryptedText; + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index b08c14467..d2717056f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -6,6 +6,7 @@ import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; import java.io.Closeable; import java.io.IOException; @@ -31,7 +32,7 @@ public class SeaweedRead { } FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); + .getBlockingStub().lookupVolume(lookupRequest.build()); Map vid2Locations = lookupResponse.getLocationsMapMap(); @@ -56,14 +57,18 @@ public class SeaweedRead { } private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + if (chunkView.cipherKey != null) { + return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations); + } + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); if (!chunkView.isFullChunk) { request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); + String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); } try { @@ -85,6 +90,44 @@ public class SeaweedRead { } } + private static int readEncryptedChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + HttpClient client = new DefaultHttpClient(); + HttpGet request = new HttpGet( + String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + + request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + + byte[] data = null; + + try { + HttpResponse response = client.execute(request); + HttpEntity entity = response.getEntity(); + + data = EntityUtils.toByteArray(entity); + + } finally { + if (client instanceof Closeable) { + Closeable t = (Closeable) client; + t.close(); + } + } + + if (chunkView.isGzipped) { + data = Gzip.decompress(data); + } + + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); + } + + int len = (int) (chunkView.logicOffset - position + chunkView.size); + System.arraycopy(data, (int) chunkView.offset, buffer, startOffset, len); + return len; + + } + protected static List viewFromVisibles(List visibleIntervals, long offset, long size) { List views = new ArrayList<>(); @@ -93,11 +136,13 @@ public class SeaweedRead { if (chunk.start <= offset && offset < chunk.stop && offset < stop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( - chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, - isFullChunk + chunk.fileId, + offset - chunk.start, + Math.min(chunk.stop, stop) - offset, + offset, + isFullChunk, + chunk.cipherKey, + chunk.isGzipped )); offset = Math.min(chunk.stop, stop); } @@ -127,11 +172,13 @@ public class SeaweedRead { List newVisibles, FilerProto.FileChunk chunk) { VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - true + chunk.getOffset(), + chunk.getOffset() + chunk.getSize(), + chunk.getFileId(), + chunk.getMtime(), + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsGzipped() ); // easy cases to speed up @@ -147,21 +194,25 @@ public class SeaweedRead { for (VisibleInterval v : visibles) { if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - false + v.start, + chunk.getOffset(), + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } long chunkStop = chunk.getOffset() + chunk.getSize(); if (v.start < chunkStop && chunkStop < v.stop) { newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false + chunkStop, + v.stop, + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -208,24 +259,30 @@ public class SeaweedRead { public final long modifiedTime; public final String fileId; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "VisibleInterval{" + - "start=" + start + - ", stop=" + stop + - ", modifiedTime=" + modifiedTime + - ", fileId='" + fileId + '\'' + - ", isFullChunk=" + isFullChunk + - '}'; + "start=" + start + + ", stop=" + stop + + ", modifiedTime=" + modifiedTime + + ", fileId='" + fileId + '\'' + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } @@ -235,24 +292,30 @@ public class SeaweedRead { public final long size; public final long logicOffset; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "ChunkView{" + - "fileId='" + fileId + '\'' + - ", offset=" + offset + - ", size=" + size + - ", logicOffset=" + logicOffset + - ", isFullChunk=" + isFullChunk + - '}'; + "fileId='" + fileId + '\'' + + ", offset=" + offset + + ", size=" + size + + ", logicOffset=" + logicOffset + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 0663e8d98..06c1bdd9f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,5 +1,6 @@ package seaweedfs.client; +import com.google.protobuf.ByteString; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; @@ -11,9 +12,12 @@ import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.security.SecureRandom; public class SeaweedWrite { + private static SecureRandom random = new SecureRandom(); + public static void writeData(FilerProto.Entry.Builder entry, final String replication, final FilerGrpcClient filerGrpcClient, @@ -22,10 +26,9 @@ public class SeaweedWrite { final long bytesOffset, final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) + .setCollection(filerGrpcClient.getCollection()) + .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) .setDataCenter("") - .setReplication("") .setTtlSec(0) .build()); String fileId = response.getFileId(); @@ -33,7 +36,14 @@ public class SeaweedWrite { String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength); + ByteString cipherKeyString = null; + byte[] cipherKey = null; + if (filerGrpcClient.isCipher()) { + cipherKey = genCipherKey(); + cipherKeyString = ByteString.copyFrom(cipherKey); + } + + String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId) @@ -41,6 +51,7 @@ public class SeaweedWrite { .setSize(bytesLength) .setMtime(System.currentTimeMillis() / 10000L) .setETag(etag) + .setCipherKey(cipherKeyString) ); } @@ -58,11 +69,22 @@ public class SeaweedWrite { private static String multipartUpload(String targetUrl, String auth, final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { HttpClient client = new DefaultHttpClient(); - InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + InputStream inputStream = null; + if (cipherKey == null) { + inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + } else { + try { + byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey); + inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length); + } catch (Exception e) { + throw new IOException("fail to encrypt data", e); + } + } HttpPost post = new HttpPost(targetUrl); if (auth != null && auth.length() != 0) { @@ -92,4 +114,10 @@ public class SeaweedWrite { } } + + private static byte[] genCipherKey() { + byte[] b = new byte[32]; + random.nextBytes(b); + return b; + } } diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java new file mode 100644 index 000000000..7b5e53e19 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java @@ -0,0 +1,42 @@ +package seaweedfs.client; + +import org.junit.Test; + +import java.util.Base64; + +import static seaweedfs.client.SeaweedCipher.decrypt; +import static seaweedfs.client.SeaweedCipher.encrypt; + +public class SeaweedCipherTest { + + @Test + public void testSameAsGoImplemnetation() throws Exception { + byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + + @Test + public void testEncryptDecrypt() throws Exception { + byte[] secretKey = SeaweedCipher.genCipherKey(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + +} -- cgit v1.2.3 From c4bea45099a3768dae7ea683afa16f2154b01ffb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 20 Mar 2020 14:17:31 -0700 Subject: S3 API: fix DeleteMultipleObjectsHandler fix https://github.com/chrislusf/seaweedfs/issues/1241 --- other/java/client/src/main/proto/filer.proto | 3 --- 1 file changed, 3 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 8df46e917..b998c330c 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -24,9 +24,6 @@ service SeaweedFiler { rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } - rpc StreamDeleteEntries (stream DeleteEntryRequest) returns (stream DeleteEntryResponse) { - } - rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { } -- cgit v1.2.3 From 3d3bab2447c5524e2db67b4e4b4f077d9b387f5a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Mar 2020 23:43:24 -0700 Subject: add notes --- other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 06c1bdd9f..5bf24ef68 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -45,6 +45,8 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); + // TODO: cache fileId ~ bytes here + entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId) .setOffset(offset) -- cgit v1.2.3 From ae2309dc58dbfb231fbf06ed7e16055a4fbc5df9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Mar 2020 23:58:59 -0700 Subject: Update SeaweedRead.java --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index d2717056f..8e850f87e 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -61,6 +61,8 @@ public class SeaweedRead { return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations); } + // TODO: read the chunk and returns the chunk view data here + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); -- cgit v1.2.3 From b656e05aaf71208a7c146839b6432fe2aaaea5a6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 29 Mar 2020 00:55:40 -0700 Subject: HDFS: support chunk cache --- .../src/main/java/seaweedfs/client/ChunkCache.java | 27 +++++++++++++ .../main/java/seaweedfs/client/SeaweedRead.java | 45 ++++++---------------- .../main/java/seaweedfs/client/SeaweedWrite.java | 3 +- 3 files changed, 41 insertions(+), 34 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/ChunkCache.java (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java new file mode 100644 index 000000000..e249d4524 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -0,0 +1,27 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class ChunkCache { + + private final Cache cache; + + public ChunkCache(int maxEntries) { + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(1, TimeUnit.HOURS) + .build(); + } + + public byte[] getChunk(String fileId) { + return this.cache.getIfPresent(fileId); + } + + public void setChunk(String fileId, byte[] data) { + this.cache.put(fileId, data); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 8e850f87e..ad92ba006 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -10,14 +10,14 @@ import org.apache.http.util.EntityUtils; import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.*; public class SeaweedRead { // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + static ChunkCache chunkCache = new ChunkCache(1000); + // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, @@ -57,42 +57,23 @@ public class SeaweedRead { } private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { - if (chunkView.cipherKey != null) { - return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations); - } - - // TODO: read the chunk and returns the chunk view data here - HttpClient client = new DefaultHttpClient(); - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + byte[] chunkData = chunkCache.getChunk(chunkView.fileId); - if (!chunkView.isFullChunk) { - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); + if (chunkData == null) { + chunkData = doFetchFullChunkData(chunkView, locations); } - try { - HttpResponse response = client.execute(request); - HttpEntity entity = response.getEntity(); - - int len = (int) (chunkView.logicOffset - position + chunkView.size); - OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); - entity.writeTo(outputStream); - // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); + int len = (int) (chunkView.logicOffset - position + chunkView.size); + System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); - return len; + chunkCache.setChunk(chunkView.fileId, chunkData); - } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); - } - } + return len; } - private static int readEncryptedChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); @@ -124,9 +105,7 @@ public class SeaweedRead { throw new IOException("fail to decrypt", e); } - int len = (int) (chunkView.logicOffset - position + chunkView.size); - System.arraycopy(data, (int) chunkView.offset, buffer, startOffset, len); - return len; + return data; } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 5bf24ef68..178234d5a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -45,7 +45,8 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - // TODO: cache fileId ~ bytes here + // cache fileId ~ bytes + SeaweedRead.chunkCache.setChunk(fileId, bytes); entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId) -- cgit v1.2.3 From 50a5018b7fb259cee84471ce8d7bb6e554602c61 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 30 Mar 2020 01:19:33 -0700 Subject: writing meta logs is working --- other/java/client/src/main/proto/filer.proto | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index b998c330c..3975b517d 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -42,6 +42,9 @@ service SeaweedFiler { rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } + rpc ListenForEvents (ListenForEventsRequest) returns (stream FullEventNotification) { + } + } ////////////////////////////////////////////////// @@ -230,3 +233,19 @@ message GetFilerConfigurationResponse { string dir_queues = 6; bool cipher = 7; } + +message ListenForEventsRequest { + string client_name = 1; + string directory = 2; + int64 since_sec = 3; +} +message FullEventNotification { + string directory = 1; + EventNotification event_notification = 2; +} + +message LogEntry { + int64 ts_ns = 1; + int32 partition_key_hash = 2; + bytes data = 3; +} -- cgit v1.2.3 From bf270d9e8c01052409464193b693d50fa09a70a9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Apr 2020 00:51:16 -0700 Subject: filer: able to tail meta data changes --- other/java/client/src/main/proto/filer.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 3975b517d..12f1cd85b 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -237,7 +237,7 @@ message GetFilerConfigurationResponse { message ListenForEventsRequest { string client_name = 1; string directory = 2; - int64 since_sec = 3; + int64 since_ns = 3; } message FullEventNotification { string directory = 1; -- cgit v1.2.3 From 2d43f85577cb998656abf2525445b72fcd8cc48d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Apr 2020 12:51:21 -0700 Subject: watch entries with common path prefix --- other/java/client/src/main/proto/filer.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 12f1cd85b..e504e4f84 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -236,7 +236,7 @@ message GetFilerConfigurationResponse { message ListenForEventsRequest { string client_name = 1; - string directory = 2; + string path_prefix = 2; int64 since_ns = 3; } message FullEventNotification { -- cgit v1.2.3 From ec2eb8bc4804f9b880f256a55e3cbfc0923b6a29 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 8 Apr 2020 08:12:00 -0700 Subject: add If-None-Match and If-Modified-Since fix https://github.com/chrislusf/seaweedfs/issues/1269 --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index e504e4f84..8f8176ff4 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -123,6 +123,7 @@ message FuseAttributes { string user_name = 11; // for hdfs repeated string group_name = 12; // for hdfs string symlink_target = 13; + bytes md5 = 14; } message CreateEntryRequest { -- cgit v1.2.3 From 6f948e48879ed0706b044bea0429d3fc48d6e8e1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 13:07:59 -0700 Subject: remove configurable topics folder location --- other/java/client/src/main/proto/filer.proto | 1 - 1 file changed, 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 8f8176ff4..40316f58b 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -231,7 +231,6 @@ message GetFilerConfigurationResponse { string collection = 3; uint32 max_mb = 4; string dir_buckets = 5; - string dir_queues = 6; bool cipher = 7; } -- cgit v1.2.3 From 7764e0465ce976bb528c27bb9aa25857102570ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 21:00:55 -0700 Subject: refactoring --- other/java/client/src/main/proto/filer.proto | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 40316f58b..fd2b8ebe3 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -42,7 +42,7 @@ service SeaweedFiler { rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } - rpc ListenForEvents (ListenForEventsRequest) returns (stream FullEventNotification) { + rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { } } @@ -234,12 +234,12 @@ message GetFilerConfigurationResponse { bool cipher = 7; } -message ListenForEventsRequest { +message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; } -message FullEventNotification { +message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; } -- cgit v1.2.3 From bda82f61bc847d6d02ebd9b242c07e01588b4e30 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 17 Apr 2020 02:28:09 -0700 Subject: filer: able to append to a file --- other/java/client/src/main/proto/filer.proto | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index fd2b8ebe3..bc159fd14 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -21,6 +21,9 @@ service SeaweedFiler { rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) { } + rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) { + } + rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } @@ -143,6 +146,14 @@ message UpdateEntryRequest { message UpdateEntryResponse { } +message AppendToEntryRequest { + string directory = 1; + string entry_name = 2; + repeated FileChunk chunks = 3; +} +message AppendToEntryResponse { +} + message DeleteEntryRequest { string directory = 1; string name = 2; -- cgit v1.2.3 From e24b25de784daf42a15daf573249d608ebc2b44a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 21 Apr 2020 21:16:13 -0700 Subject: async meta caching: can stream updates now --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index bc159fd14..6d890861d 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -253,6 +253,7 @@ message SubscribeMetadataRequest { message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; + int64 ts_ns = 3; } message LogEntry { -- cgit v1.2.3 From b52b8ec68553781408d1ac8c6f7a2cd4d935aea6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 26 Apr 2020 05:21:54 -0700 Subject: Hadoop: fix entry not found for HCFS also fix cipher related changes. --- .../client/src/main/java/seaweedfs/client/FilerClient.java | 9 ++++++--- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 10 ++++++---- .../client/src/main/java/seaweedfs/client/SeaweedWrite.java | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 84aa26ad9..ef32c7e9a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -14,7 +14,7 @@ public class FilerClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private FilerGrpcClient filerGrpcClient; + private final FilerGrpcClient filerGrpcClient; public FilerClient(String host, int grpcPort) { filerGrpcClient = new FilerGrpcClient(host, grpcPort); @@ -181,7 +181,7 @@ public class FilerClient { .setLimit(limit) .build()); List entries = new ArrayList<>(); - while (iter.hasNext()){ + while (iter.hasNext()) { FilerProto.ListEntriesResponse resp = iter.next(); entries.add(fixEntryAfterReading(resp.getEntry())); } @@ -195,9 +195,12 @@ public class FilerClient { .setDirectory(directory) .setName(entryName) .build()).getEntry(); + if (entry == null) { + return null; + } return fixEntryAfterReading(entry); } catch (Exception e) { - if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){ + if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { return null; } LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index ad92ba006..1e4a158c6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -99,10 +99,12 @@ public class SeaweedRead { data = Gzip.decompress(data); } - try { - data = SeaweedCipher.decrypt(data, chunkView.cipherKey); - } catch (Exception e) { - throw new IOException("fail to decrypt", e); + if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); + } } return data; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 178234d5a..dc6203e52 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -36,7 +36,7 @@ public class SeaweedWrite { String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - ByteString cipherKeyString = null; + ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; if (filerGrpcClient.isCipher()) { cipherKey = genCipherKey(); @@ -78,7 +78,7 @@ public class SeaweedWrite { HttpClient client = new DefaultHttpClient(); InputStream inputStream = null; - if (cipherKey == null) { + if (cipherKey == null || cipherKey.length == 0) { inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); } else { try { -- cgit v1.2.3 From 1e3e4b3072071341b4bb4b0bb7c611457e927f97 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 5 May 2020 02:05:28 -0700 Subject: add broker connects to filer --- other/java/client/src/main/proto/filer.proto | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6d890861d..3b3b78bbb 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -48,6 +48,8 @@ service SeaweedFiler { rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { } + rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { + } } ////////////////////////////////////////////////// @@ -261,3 +263,10 @@ message LogEntry { int32 partition_key_hash = 2; bytes data = 3; } + +message KeepConnectedRequest { + string name = 1; + uint32 grpc_port = 2; +} +message KeepConnectedResponse { +} -- cgit v1.2.3 From dfccc3c2637693dce141c27a321ba5d3aea1ace9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 8 May 2020 02:47:22 -0700 Subject: able to read chan and write chan --- other/java/client/src/main/proto/filer.proto | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 3b3b78bbb..1fc8ef63d 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -50,6 +50,10 @@ service SeaweedFiler { rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { } + + rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { + } + } ////////////////////////////////////////////////// @@ -267,6 +271,21 @@ message LogEntry { message KeepConnectedRequest { string name = 1; uint32 grpc_port = 2; + repeated string resources = 3; } message KeepConnectedResponse { } + +message LocateBrokerRequest { + string resource = 1; +} +message LocateBrokerResponse { + bool found = 1; + // if found, send the exact address + // if not found, send the full list of existing brokers + message Resource { + string grpc_addresses = 1; + int32 resource_count = 2; + } + repeated Resource resources = 2; +} -- cgit v1.2.3 From 8dfaaeabfd97efff0e17af5cdf5b2d647bdb9b88 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 May 2020 17:07:34 -0700 Subject: HCFS: 1.2.8 fix hbase related bugs 1. SeaweedFileSystem.listStatus need to work with file also 2. SeaweedRead readChunkView has wrong len --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 1e4a158c6..7be39da53 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -7,6 +7,8 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -14,7 +16,7 @@ import java.util.*; public class SeaweedRead { - // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); static ChunkCache chunkCache = new ChunkCache(1000); @@ -64,7 +66,9 @@ public class SeaweedRead { chunkData = doFetchFullChunkData(chunkView, locations); } - int len = (int) (chunkView.logicOffset - position + chunkView.size); + int len = (int) chunkView.size; + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", + chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); chunkCache.setChunk(chunkView.fileId, chunkData); -- cgit v1.2.3 From 222f93e8166ed73be3afeb1a656ff5007b82e525 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jun 2020 14:59:43 -0700 Subject: possibly fix concurrent access to entry object in SeaweedOutputStream --- .../main/java/seaweedfs/client/SeaweedWrite.java | 34 ++++++++++++---------- 1 file changed, 19 insertions(+), 15 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index dc6203e52..18ec77b76 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -45,28 +45,32 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); + synchronized (entry) { + entry.addChunks(FilerProto.FileChunk.newBuilder() + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) + .setCipherKey(cipherKeyString) + ); + } + // cache fileId ~ bytes SeaweedRead.chunkCache.setChunk(fileId, bytes); - entry.addChunks(FilerProto.FileChunk.newBuilder() - .setFileId(fileId) - .setOffset(offset) - .setSize(bytesLength) - .setMtime(System.currentTimeMillis() / 10000L) - .setETag(etag) - .setCipherKey(cipherKeyString) - ); - } public static void writeMeta(final FilerGrpcClient filerGrpcClient, final String parentDirectory, final FilerProto.Entry.Builder entry) { - filerGrpcClient.getBlockingStub().createEntry( - FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parentDirectory) - .setEntry(entry) - .build() - ); + synchronized (entry){ + filerGrpcClient.getBlockingStub().createEntry( + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parentDirectory) + .setEntry(entry) + .build() + ); + } } private static String multipartUpload(String targetUrl, -- cgit v1.2.3 From ca3516ac6d9cc266b4953d96360c0c733e7244a3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 20 Jun 2020 08:00:25 -0700 Subject: adjust protoc --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 1fc8ef63d..d3a5ea3a5 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package filer_pb; +option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"; option java_package = "seaweedfs.client"; option java_outer_classname = "FilerProto"; -- cgit v1.2.3 From 97239ce6f109448e1f0488db31a84d40837cb168 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 20 Jun 2020 08:15:49 -0700 Subject: rename filechunk is_gzipped to is_compressed --- other/java/client/src/main/proto/filer.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index d3a5ea3a5..9f55b23f3 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -110,7 +110,7 @@ message FileChunk { FileId fid = 7; FileId source_fid = 8; bytes cipher_key = 9; - bool is_gzipped = 10; + bool is_compressed = 10; } message FileId { -- cgit v1.2.3 From 31e23e97837ba8d6a403e0b8c2d9ea6ff0bc0387 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 30 Jun 2020 22:53:53 -0700 Subject: filer: support active<=>active filer replication --- other/java/client/src/main/proto/filer.proto | 3 +++ 1 file changed, 3 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 9f55b23f3..a65ef8f3a 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -140,6 +140,7 @@ message CreateEntryRequest { string directory = 1; Entry entry = 2; bool o_excl = 3; + bool is_from_other_cluster = 4; } message CreateEntryResponse { @@ -149,6 +150,7 @@ message CreateEntryResponse { message UpdateEntryRequest { string directory = 1; Entry entry = 2; + bool is_from_other_cluster = 3; } message UpdateEntryResponse { } @@ -168,6 +170,7 @@ message DeleteEntryRequest { bool is_delete_data = 4; bool is_recursive = 5; bool ignore_recursive_error = 6; + bool is_from_other_cluster = 7; } message DeleteEntryResponse { -- cgit v1.2.3 From 37d5b3ba12a5eed0b534815ca1aad2ca3d8566bd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 1 Jul 2020 08:06:20 -0700 Subject: replication: pass isFromOtherCluster also to EventNotification EventNotification is consistent with message queue and metadata logs. --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index a65ef8f3a..9fc0669ab 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -98,6 +98,7 @@ message EventNotification { Entry new_entry = 2; bool delete_chunks = 3; string new_parent_path = 4; + bool is_from_other_cluster = 5; } message FileChunk { -- cgit v1.2.3 From 70d8a3a1d3395ece4a37c5aac1249293b57d5975 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Jul 2020 15:50:07 -0700 Subject: add SubscribeLocalMetadata without checking persisted meta logs --- other/java/client/src/main/proto/filer.proto | 3 +++ 1 file changed, 3 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 9fc0669ab..37121f29c 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -49,6 +49,9 @@ service SeaweedFiler { rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { } + rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { + } + rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { } -- cgit v1.2.3