diff options
| author | yourchanges <yourchanges@gmail.com> | 2020-07-10 09:44:32 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-10 09:44:32 +0800 |
| commit | e67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch) | |
| tree | 4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /other/java/client/src | |
| parent | 2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff) | |
| parent | 1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff) | |
| download | seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip | |
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'other/java/client/src')
11 files changed, 738 insertions, 167 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java new file mode 100644 index 000000000..e249d4524 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -0,0 +1,27 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class ChunkCache { + + private final Cache<String, byte[]> cache; + + public ChunkCache(int maxEntries) { + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(1, TimeUnit.HOURS) + .build(); + } + + public byte[] getChunk(String fileId) { + return this.cache.getIfPresent(fileId); + } + + public void setChunk(String fileId, byte[] data) { + this.cache.put(fileId, data); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 63d0d8320..ef32c7e9a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -7,13 +7,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; public class FilerClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private FilerGrpcClient filerGrpcClient; + private final FilerGrpcClient filerGrpcClient; public FilerClient(String host, int grpcPort) { filerGrpcClient = new FilerGrpcClient(host, grpcPort); @@ -34,13 +35,12 @@ public class FilerClient { public boolean mkdirs(String path, int mode, int uid, int gid, String userName, String[] groupNames) { - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); - if ("/".equals(path)) { return true; } + Path pathObject = Paths.get(path); + String parent = pathObject.getParent().toString(); + String name = pathObject.getFileName().toString(); mkdirs(parent, mode, uid, gid, userName, groupNames); @@ -51,23 +51,38 @@ public class FilerClient { } return createEntry( - parent, - newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build() + parent, + newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build() ); } - public boolean rm(String path, boolean isRecursive) { + public boolean mv(String oldPath, String newPath) { + + Path oldPathObject = Paths.get(oldPath); + String oldParent = oldPathObject.getParent().toString(); + String oldName = oldPathObject.getFileName().toString(); + + Path newPathObject = Paths.get(newPath); + String newParent = newPathObject.getParent().toString(); + String newName = newPathObject.getFileName().toString(); + + return atomicRenameEntry(oldParent, oldName, newParent, newName); + + } + + public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) { Path pathObject = Paths.get(path); String parent = pathObject.getParent().toString(); String name = pathObject.getFileName().toString(); return deleteEntry( - parent, - name, - true, - isRecursive); + parent, + name, + true, + isRecursive, + ignoreRecusiveError); } public boolean touch(String path, int mode) { @@ -84,18 +99,18 @@ public class FilerClient { FilerProto.Entry entry = lookupEntry(parent, name); if (entry == null) { return createEntry( - parent, - newFileEntry(name, mode, uid, gid, userName, groupNames).build() + parent, + newFileEntry(name, mode, uid, gid, userName, groupNames).build() ); } long now = System.currentTimeMillis() / 1000L; FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder() - .setMtime(now) - .setUid(uid) - .setGid(gid) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames)); + .setMtime(now) + .setUid(uid) + .setGid(gid) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames)); return updateEntry(parent, entry.toBuilder().setAttributes(attr).build()); } @@ -105,17 +120,17 @@ public class FilerClient { long now = System.currentTimeMillis() / 1000L; return FilerProto.Entry.newBuilder() - .setName(name) - .setIsDirectory(true) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setMtime(now) - .setCrtime(now) - .setUid(uid) - .setGid(gid) - .setFileMode(mode | 1 << 31) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames))); + .setName(name) + .setIsDirectory(true) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode | 1 << 31) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); } public FilerProto.Entry.Builder newFileEntry(String name, int mode, @@ -124,17 +139,17 @@ public class FilerClient { long now = System.currentTimeMillis() / 1000L; return FilerProto.Entry.newBuilder() - .setName(name) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setMtime(now) - .setCrtime(now) - .setUid(uid) - .setGid(gid) - .setFileMode(mode) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames))); + .setName(name) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); } public List<FilerProto.Entry> listEntries(String path) { @@ -159,22 +174,35 @@ public class FilerClient { } public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) { - return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() - .setDirectory(path) - .setPrefix(entryPrefix) - .setStartFromFileName(lastEntryName) - .setLimit(limit) - .build()).getEntriesList(); + Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + .setDirectory(path) + .setPrefix(entryPrefix) + .setStartFromFileName(lastEntryName) + .setLimit(limit) + .build()); + List<FilerProto.Entry> entries = new ArrayList<>(); + while (iter.hasNext()) { + FilerProto.ListEntriesResponse resp = iter.next(); + entries.add(fixEntryAfterReading(resp.getEntry())); + } + return entries; } public FilerProto.Entry lookupEntry(String directory, String entryName) { try { - return filerGrpcClient.getBlockingStub().lookupDirectoryEntry( - FilerProto.LookupDirectoryEntryRequest.newBuilder() - .setDirectory(directory) - .setName(entryName) - .build()).getEntry(); + FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.LookupDirectoryEntryRequest.newBuilder() + .setDirectory(directory) + .setName(entryName) + .build()).getEntry(); + if (entry == null) { + return null; + } + return fixEntryAfterReading(entry); } catch (Exception e) { + if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { + return null; + } LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); return null; } @@ -184,9 +212,9 @@ public class FilerClient { public boolean createEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; @@ -197,9 +225,9 @@ public class FilerClient { public boolean updateEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; @@ -207,14 +235,15 @@ public class FilerClient { return true; } - public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) { + public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) { try { filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() - .setDirectory(parent) - .setName(entryName) - .setIsDeleteData(isDeleteFileChunk) - .setIsRecursive(isRecursive) - .build()); + .setDirectory(parent) + .setName(entryName) + .setIsDeleteData(isDeleteFileChunk) + .setIsRecursive(isRecursive) + .setIgnoreRecursiveError(ignoreRecusiveError) + .build()); } catch (Exception e) { LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e); return false; @@ -222,4 +251,39 @@ public class FilerClient { return true; } + public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { + try { + filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() + .setOldDirectory(oldParent) + .setOldName(oldName) + .setNewDirectory(newParent) + .setNewName(newName) + .build()); + } catch (Exception e) { + LOG.warn("atomicRenameEntry {}/{} => {}/{}: {}", oldParent, oldName, newParent, newName, e); + return false; + } + return true; + } + + private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) { + if (entry.getChunksList().size() <= 0) { + return entry; + } + String fileId = entry.getChunks(0).getFileId(); + if (fileId != null && fileId.length() != 0) { + return entry; + } + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + entryBuilder.clearChunks(); + for (FilerProto.FileChunk chunk : entry.getChunksList()) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + FilerProto.FileId fid = chunk.getFid(); + fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + chunkBuilder.setFileId(fileId); + entryBuilder.addChunks(chunkBuilder); + } + return entryBuilder.build(); + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 16b7c3249..3f5d1e8e9 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -2,22 +2,55 @@ package seaweedfs.client; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLException; import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; public class FilerGrpcClient { - private static final Logger logger = Logger.getLogger(FilerGrpcClient.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); + static SslContext sslContext; + + static { + try { + sslContext = FilerSslContext.loadSslContext(); + } catch (SSLException e) { + logger.warn("failed to load ssl context", e); + } + } private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; - + private boolean cipher = false; + private String collection = ""; + private String replication = ""; public FilerGrpcClient(String host, int grpcPort) { - this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()); + this(host, grpcPort, sslContext); + } + + public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) { + + this(sslContext == null ? + ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() : + NettyChannelBuilder.forAddress(host, grpcPort) + .negotiationType(NegotiationType.TLS) + .sslContext(sslContext)); + + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = + this.getBlockingStub().getFilerConfiguration( + FilerProto.GetFilerConfigurationRequest.newBuilder().build()); + cipher = filerConfigurationResponse.getCipher(); + collection = filerConfigurationResponse.getCollection(); + replication = filerConfigurationResponse.getReplication(); + } public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { @@ -27,6 +60,18 @@ public class FilerGrpcClient { futureStub = SeaweedFilerGrpc.newFutureStub(channel); } + public boolean isCipher() { + return cipher; + } + + public String getCollection() { + return collection; + } + + public String getReplication() { + return replication; + } + public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } @@ -42,4 +87,5 @@ public class FilerGrpcClient { public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { return futureStub; } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java b/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java new file mode 100644 index 000000000..5a88c1da3 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java @@ -0,0 +1,64 @@ +package seaweedfs.client; + +import com.google.common.base.Strings; +import com.moandjiezana.toml.Toml; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; +import java.io.File; + +public class FilerSslContext { + + private static final Logger logger = LoggerFactory.getLogger(FilerSslContext.class); + + public static SslContext loadSslContext() throws SSLException { + String securityFileName = "security.toml"; + String home = System.getProperty("user.home"); + File f1 = new File("./"+securityFileName); + File f2 = new File(home + "/.seaweedfs/"+securityFileName); + File f3 = new File(home + "/etc/seaweedfs/"+securityFileName); + + File securityFile = f1.exists()? f1 : f2.exists() ? f2 : f3.exists()? f3 : null; + + if (securityFile==null){ + return null; + } + + Toml toml = new Toml().read(securityFile); + logger.debug("reading ssl setup from {}", securityFile); + + String trustCertCollectionFilePath = toml.getString("grpc.ca"); + logger.debug("loading ca from {}", trustCertCollectionFilePath); + String clientCertChainFilePath = toml.getString("grpc.client.cert"); + logger.debug("loading client ca from {}", clientCertChainFilePath); + String clientPrivateKeyFilePath = toml.getString("grpc.client.key"); + logger.debug("loading client key from {}", clientPrivateKeyFilePath); + + if (Strings.isNullOrEmpty(clientPrivateKeyFilePath) && Strings.isNullOrEmpty(clientPrivateKeyFilePath)){ + return null; + } + + // possibly fix the format https://netty.io/wiki/sslcontextbuilder-and-private-key.html + + return buildSslContext(trustCertCollectionFilePath, clientCertChainFilePath, clientPrivateKeyFilePath); + } + + + private static SslContext buildSslContext(String trustCertCollectionFilePath, + String clientCertChainFilePath, + String clientPrivateKeyFilePath) throws SSLException { + SslContextBuilder builder = GrpcSslContexts.forClient(); + if (trustCertCollectionFilePath != null) { + builder.trustManager(new File(trustCertCollectionFilePath)); + } + if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) { + builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath)); + } + return builder.trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java new file mode 100644 index 000000000..248285dd3 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -0,0 +1,37 @@ +package seaweedfs.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class Gzip { + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length); + GZIPOutputStream gzip = new GZIPOutputStream(bos); + gzip.write(data); + gzip.close(); + byte[] compressed = bos.toByteArray(); + bos.close(); + return compressed; + } + + public static byte[] decompress(byte[] compressed) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } + + private static byte[] readAll(InputStream input) throws IOException { + try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + byte[] buffer = new byte[4096]; + int n; + while (-1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + } + return output.toByteArray(); + } + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java new file mode 100644 index 000000000..8d0ebd755 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java @@ -0,0 +1,55 @@ +package seaweedfs.client; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.security.SecureRandom; + +public class SeaweedCipher { + // AES-GCM parameters + public static final int AES_KEY_SIZE = 256; // in bits + public static final int GCM_NONCE_LENGTH = 12; // in bytes + public static final int GCM_TAG_LENGTH = 16; // in bytes + + private static SecureRandom random = new SecureRandom(); + + public static byte[] genCipherKey() throws Exception { + byte[] key = new byte[AES_KEY_SIZE / 8]; + random.nextBytes(key); + return key; + } + + public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception { + return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey); + } + + public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception { + + final byte[] nonce = new byte[GCM_NONCE_LENGTH]; + random.nextBytes(nonce); + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + + Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec); + + byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length); + + byte[] iv = AES_cipherInstance.getIV(); + byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH]; + System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length); + + return message; + } + + public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception { + final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params); + byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH); + return decryptedText; + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index a906a689b..7be39da53 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -5,25 +5,25 @@ import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; +import java.util.*; public class SeaweedRead { - // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + + static ChunkCache chunkCache = new ChunkCache(1000); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) { + final int bufferLength) throws IOException { List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); @@ -34,7 +34,7 @@ public class SeaweedRead { } FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); + .getBlockingStub().lookupVolume(lookupRequest.build()); Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); @@ -58,35 +58,64 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) { - HttpClient client = HttpClientBuilder.create().build(); - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + + byte[] chunkData = chunkCache.getChunk(chunkView.fileId); - if (!chunkView.isFullChunk) { - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size)); + if (chunkData == null) { + chunkData = doFetchFullChunkData(chunkView, locations); } + int len = (int) chunkView.size; + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", + chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); + System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); + + chunkCache.setChunk(chunkView.fileId, chunkData); + + return len; + } + + private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + + HttpClient client = new DefaultHttpClient(); + HttpGet request = new HttpGet( + String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + + request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + + byte[] data = null; + try { HttpResponse response = client.execute(request); HttpEntity entity = response.getEntity(); - int len = (int) (chunkView.logicOffset - position + chunkView.size); - OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); - entity.writeTo(outputStream); - // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); + data = EntityUtils.toByteArray(entity); - return len; + } finally { + if (client instanceof Closeable) { + Closeable t = (Closeable) client; + t.close(); + } + } - } catch (IOException e) { - e.printStackTrace(); + if (chunkView.isGzipped) { + data = Gzip.decompress(data); } - return 0; + + if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); + } + } + + return data; + } - public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) { + protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) { List<ChunkView> views = new ArrayList<>(); long stop = offset + size; @@ -94,11 +123,13 @@ public class SeaweedRead { if (chunk.start <= offset && offset < chunk.stop && offset < stop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( - chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, - isFullChunk + chunk.fileId, + offset - chunk.start, + Math.min(chunk.stop, stop) - offset, + offset, + isFullChunk, + chunk.cipherKey, + chunk.isGzipped )); offset = Math.min(chunk.stop, stop); } @@ -128,11 +159,13 @@ public class SeaweedRead { List<VisibleInterval> newVisibles, FilerProto.FileChunk chunk) { VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - true + chunk.getOffset(), + chunk.getOffset() + chunk.getSize(), + chunk.getFileId(), + chunk.getMtime(), + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsGzipped() ); // easy cases to speed up @@ -148,21 +181,25 @@ public class SeaweedRead { for (VisibleInterval v : visibles) { if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - false + v.start, + chunk.getOffset(), + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } long chunkStop = chunk.getOffset() + chunk.getSize(); if (v.start < chunkStop && chunkStop < v.stop) { newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false + chunkStop, + v.stop, + v.fileId, + v.modifiedTime, + false, + v.cipherKey, + v.isGzipped )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -209,24 +246,30 @@ public class SeaweedRead { public final long modifiedTime; public final String fileId; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "VisibleInterval{" + - "start=" + start + - ", stop=" + stop + - ", modifiedTime=" + modifiedTime + - ", fileId='" + fileId + '\'' + - ", isFullChunk=" + isFullChunk + - '}'; + "start=" + start + + ", stop=" + stop + + ", modifiedTime=" + modifiedTime + + ", fileId='" + fileId + '\'' + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } @@ -236,24 +279,30 @@ public class SeaweedRead { public final long size; public final long logicOffset; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isGzipped; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isGzipped = isGzipped; } @Override public String toString() { return "ChunkView{" + - "fileId='" + fileId + '\'' + - ", offset=" + offset + - ", size=" + size + - ", logicOffset=" + logicOffset + - ", isFullChunk=" + isFullChunk + - '}'; + "fileId='" + fileId + '\'' + + ", offset=" + offset + + ", size=" + size + + ", logicOffset=" + logicOffset + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isGzipped=" + isGzipped + + '}'; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index a7cede09f..18ec77b76 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,18 +1,23 @@ package seaweedfs.client; +import com.google.protobuf.ByteString; import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.DefaultHttpClient; import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.security.SecureRandom; public class SeaweedWrite { + private static SecureRandom random = new SecureRandom(); + public static void writeData(FilerProto.Entry.Builder entry, final String replication, final FilerGrpcClient filerGrpcClient, @@ -20,53 +25,83 @@ public class SeaweedWrite { final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( - FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) - .setDataCenter("") - .setReplication("") - .setTtlSec(0) - .build()); + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection(filerGrpcClient.getCollection()) + .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) + .setDataCenter("") + .setTtlSec(0) + .build()); String fileId = response.getFileId(); String url = response.getUrl(); + String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength); + ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; + byte[] cipherKey = null; + if (filerGrpcClient.isCipher()) { + cipherKey = genCipherKey(); + cipherKeyString = ByteString.copyFrom(cipherKey); + } + + String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); + + synchronized (entry) { + entry.addChunks(FilerProto.FileChunk.newBuilder() + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) + .setCipherKey(cipherKeyString) + ); + } - entry.addChunks(FilerProto.FileChunk.newBuilder() - .setFileId(fileId) - .setOffset(offset) - .setSize(bytesLength) - .setMtime(System.currentTimeMillis() / 10000L) - .setETag(etag) - ); + // cache fileId ~ bytes + SeaweedRead.chunkCache.setChunk(fileId, bytes); } public static void writeMeta(final FilerGrpcClient filerGrpcClient, final String parentDirectory, final FilerProto.Entry.Builder entry) { - filerGrpcClient.getBlockingStub().createEntry( - FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parentDirectory) - .setEntry(entry) - .build() - ); + synchronized (entry){ + filerGrpcClient.getBlockingStub().createEntry( + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parentDirectory) + .setEntry(entry) + .build() + ); + } } private static String multipartUpload(String targetUrl, + String auth, final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { - - CloseableHttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build(); - - InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { + + HttpClient client = new DefaultHttpClient(); + + InputStream inputStream = null; + if (cipherKey == null || cipherKey.length == 0) { + inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + } else { + try { + byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey); + inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length); + } catch (Exception e) { + throw new IOException("fail to encrypt data", e); + } + } HttpPost post = new HttpPost(targetUrl); + if (auth != null && auth.length() != 0) { + post.addHeader("Authorization", "BEARER " + auth); + } post.setEntity(MultipartEntityBuilder.create() - .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) - .addBinaryBody("upload", inputStream) - .build()); + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("upload", inputStream) + .build()); try { HttpResponse response = client.execute(post); @@ -79,8 +114,17 @@ public class SeaweedWrite { return etag; } finally { - client.close(); + if (client instanceof Closeable) { + Closeable t = (Closeable) client; + t.close(); + } } } + + private static byte[] genCipherKey() { + byte[] b = new byte[32]; + random.nextBytes(b); + return b; + } } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6cd4df6b4..37121f29c 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package filer_pb; +option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"; option java_package = "seaweedfs.client"; option java_outer_classname = "FilerProto"; @@ -12,7 +13,7 @@ service SeaweedFiler { rpc LookupDirectoryEntry (LookupDirectoryEntryRequest) returns (LookupDirectoryEntryResponse) { } - rpc ListEntries (ListEntriesRequest) returns (ListEntriesResponse) { + rpc ListEntries (ListEntriesRequest) returns (stream ListEntriesResponse) { } rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) { @@ -21,9 +22,15 @@ service SeaweedFiler { rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) { } + rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) { + } + rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } + rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { + } + rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) { } @@ -36,6 +43,21 @@ service SeaweedFiler { rpc Statistics (StatisticsRequest) returns (StatisticsResponse) { } + rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { + } + + rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { + } + + rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { + } + + rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { + } + + rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { + } + } ////////////////////////////////////////////////// @@ -58,7 +80,7 @@ message ListEntriesRequest { } message ListEntriesResponse { - repeated Entry entries = 1; + Entry entry = 1; } message Entry { @@ -69,19 +91,36 @@ message Entry { map<string, bytes> extended = 5; } +message FullEntry { + string dir = 1; + Entry entry = 2; +} + message EventNotification { Entry old_entry = 1; Entry new_entry = 2; bool delete_chunks = 3; + string new_parent_path = 4; + bool is_from_other_cluster = 5; } message FileChunk { - string file_id = 1; + string file_id = 1; // to be deprecated int64 offset = 2; uint64 size = 3; int64 mtime = 4; string e_tag = 5; - string source_file_id = 6; + string source_file_id = 6; // to be deprecated + FileId fid = 7; + FileId source_fid = 8; + bytes cipher_key = 9; + bool is_compressed = 10; +} + +message FileId { + uint32 volume_id = 1; + uint64 file_key = 2; + fixed32 cookie = 3; } message FuseAttributes { @@ -98,32 +137,58 @@ message FuseAttributes { string user_name = 11; // for hdfs repeated string group_name = 12; // for hdfs string symlink_target = 13; + bytes md5 = 14; } message CreateEntryRequest { string directory = 1; Entry entry = 2; + bool o_excl = 3; + bool is_from_other_cluster = 4; } message CreateEntryResponse { + string error = 1; } message UpdateEntryRequest { string directory = 1; Entry entry = 2; + bool is_from_other_cluster = 3; } message UpdateEntryResponse { } +message AppendToEntryRequest { + string directory = 1; + string entry_name = 2; + repeated FileChunk chunks = 3; +} +message AppendToEntryResponse { +} + message DeleteEntryRequest { string directory = 1; string name = 2; // bool is_directory = 3; bool is_delete_data = 4; bool is_recursive = 5; + bool ignore_recursive_error = 6; + bool is_from_other_cluster = 7; } message DeleteEntryResponse { + string error = 1; +} + +message AtomicRenameEntryRequest { + string old_directory = 1; + string old_name = 2; + string new_directory = 3; + string new_name = 4; +} + +message AtomicRenameEntryResponse { } message AssignVolumeRequest { @@ -132,6 +197,7 @@ message AssignVolumeRequest { string replication = 3; int32 ttl_sec = 4; string data_center = 5; + string parent_path = 6; } message AssignVolumeResponse { @@ -139,6 +205,10 @@ message AssignVolumeResponse { string url = 2; string public_url = 3; int32 count = 4; + string auth = 5; + string collection = 6; + string replication = 7; + string error = 8; } message LookupVolumeRequest { @@ -177,3 +247,53 @@ message StatisticsResponse { uint64 used_size = 5; uint64 file_count = 6; } + +message GetFilerConfigurationRequest { +} +message GetFilerConfigurationResponse { + repeated string masters = 1; + string replication = 2; + string collection = 3; + uint32 max_mb = 4; + string dir_buckets = 5; + bool cipher = 7; +} + +message SubscribeMetadataRequest { + string client_name = 1; + string path_prefix = 2; + int64 since_ns = 3; +} +message SubscribeMetadataResponse { + string directory = 1; + EventNotification event_notification = 2; + int64 ts_ns = 3; +} + +message LogEntry { + int64 ts_ns = 1; + int32 partition_key_hash = 2; + bytes data = 3; +} + +message KeepConnectedRequest { + string name = 1; + uint32 grpc_port = 2; + repeated string resources = 3; +} +message KeepConnectedResponse { +} + +message LocateBrokerRequest { + string resource = 1; +} +message LocateBrokerResponse { + bool found = 1; + // if found, send the exact address + // if not found, send the full list of existing brokers + message Resource { + string grpc_addresses = 1; + int32 resource_count = 2; + } + repeated Resource resources = 2; +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java new file mode 100644 index 000000000..7b5e53e19 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java @@ -0,0 +1,42 @@ +package seaweedfs.client; + +import org.junit.Test; + +import java.util.Base64; + +import static seaweedfs.client.SeaweedCipher.decrypt; +import static seaweedfs.client.SeaweedCipher.encrypt; + +public class SeaweedCipherTest { + + @Test + public void testSameAsGoImplemnetation() throws Exception { + byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + + @Test + public void testEncryptDecrypt() throws Exception { + byte[] secretKey = SeaweedCipher.genCipherKey(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java new file mode 100644 index 000000000..eaf17e5c6 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java @@ -0,0 +1,23 @@ +package seaweedfs.client; + +import java.util.List; + +public class SeaweedFilerTest { + public static void main(String[] args){ + + FilerClient filerClient = new FilerClient("localhost", 18888); + + List<FilerProto.Entry> entries = filerClient.listEntries("/"); + + for (FilerProto.Entry entry : entries) { + System.out.println(entry.toString()); + } + + filerClient.mkdirs("/new_folder", 0755); + filerClient.touch("/new_folder/new_empty_file", 0755); + filerClient.touch("/new_folder/new_empty_file2", 0755); + filerClient.rm("/new_folder/new_empty_file", false, true); + filerClient.rm("/new_folder", true, true); + + } +} |
