diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-07-16 11:13:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-07-16 11:13:23 +0800 |
| commit | d19bbee98d89ec6cd603572bd9c5d55749610e61 (patch) | |
| tree | 8d760dcee4dfcb4404af90b7d5e64def4549b4cc /other/java | |
| parent | 01060c992591f412b0d5e180bde29991747a9462 (diff) | |
| parent | 5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff) | |
| download | seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip | |
keep update from original (#1)
keep update from original
Diffstat (limited to 'other/java')
8 files changed, 323 insertions, 195 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 1ea39863f..5882c726d 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -4,7 +4,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.0.5</version> + <version>1.1.0</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 63d0d8320..f4bd0944b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -51,12 +51,26 @@ public class FilerClient { } return createEntry( - parent, - newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build() + parent, + newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build() ); } + public boolean mv(String oldPath, String newPath) { + + Path oldPathObject = Paths.get(oldPath); + String oldParent = oldPathObject.getParent().toString(); + String oldName = oldPathObject.getFileName().toString(); + + Path newPathObject = Paths.get(newPath); + String newParent = newPathObject.getParent().toString(); + String newName = newPathObject.getFileName().toString(); + + return atomicRenameEntry(oldParent, oldName, newParent, newName); + + } + public boolean rm(String path, boolean isRecursive) { Path pathObject = Paths.get(path); @@ -64,10 +78,10 @@ public class FilerClient { String name = pathObject.getFileName().toString(); return deleteEntry( - parent, - name, - true, - isRecursive); + parent, + name, + true, + isRecursive); } public boolean touch(String path, int mode) { @@ -84,18 +98,18 @@ public class FilerClient { FilerProto.Entry entry = lookupEntry(parent, name); if (entry == null) { return createEntry( - parent, - newFileEntry(name, mode, uid, gid, userName, groupNames).build() + parent, + newFileEntry(name, mode, uid, gid, userName, groupNames).build() ); } long now = System.currentTimeMillis() / 1000L; FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder() - .setMtime(now) - .setUid(uid) - .setGid(gid) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames)); + .setMtime(now) + .setUid(uid) + .setGid(gid) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames)); return updateEntry(parent, entry.toBuilder().setAttributes(attr).build()); } @@ -105,17 +119,17 @@ public class FilerClient { long now = System.currentTimeMillis() / 1000L; return FilerProto.Entry.newBuilder() - .setName(name) - .setIsDirectory(true) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setMtime(now) - .setCrtime(now) - .setUid(uid) - .setGid(gid) - .setFileMode(mode | 1 << 31) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames))); + .setName(name) + .setIsDirectory(true) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode | 1 << 31) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); } public FilerProto.Entry.Builder newFileEntry(String name, int mode, @@ -124,17 +138,17 @@ public class FilerClient { long now = System.currentTimeMillis() / 1000L; return FilerProto.Entry.newBuilder() - .setName(name) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setMtime(now) - .setCrtime(now) - .setUid(uid) - .setGid(gid) - .setFileMode(mode) - .setUserName(userName) - .clearGroupName() - .addAllGroupName(Arrays.asList(groupNames))); + .setName(name) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); } public List<FilerProto.Entry> listEntries(String path) { @@ -159,21 +173,27 @@ public class FilerClient { } public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) { - return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() - .setDirectory(path) - .setPrefix(entryPrefix) - .setStartFromFileName(lastEntryName) - .setLimit(limit) - .build()).getEntriesList(); + List<FilerProto.Entry> entries = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + .setDirectory(path) + .setPrefix(entryPrefix) + .setStartFromFileName(lastEntryName) + .setLimit(limit) + .build()).getEntriesList(); + List<FilerProto.Entry> fixedEntries = new ArrayList<>(entries.size()); + for (FilerProto.Entry entry : entries) { + fixedEntries.add(fixEntryAfterReading(entry)); + } + return fixedEntries; } public FilerProto.Entry lookupEntry(String directory, String entryName) { try { - return filerGrpcClient.getBlockingStub().lookupDirectoryEntry( - FilerProto.LookupDirectoryEntryRequest.newBuilder() - .setDirectory(directory) - .setName(entryName) - .build()).getEntry(); + FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.LookupDirectoryEntryRequest.newBuilder() + .setDirectory(directory) + .setName(entryName) + .build()).getEntry(); + return fixEntryAfterReading(entry); } catch (Exception e) { LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); return null; @@ -184,9 +204,9 @@ public class FilerClient { public boolean createEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; @@ -197,9 +217,9 @@ public class FilerClient { public boolean updateEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; @@ -210,11 +230,11 @@ public class FilerClient { public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) { try { filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() - .setDirectory(parent) - .setName(entryName) - .setIsDeleteData(isDeleteFileChunk) - .setIsRecursive(isRecursive) - .build()); + .setDirectory(parent) + .setName(entryName) + .setIsDeleteData(isDeleteFileChunk) + .setIsRecursive(isRecursive) + .build()); } catch (Exception e) { LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e); return false; @@ -222,4 +242,39 @@ public class FilerClient { return true; } + public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { + try { + filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() + .setOldDirectory(oldParent) + .setOldName(oldName) + .setNewDirectory(newParent) + .setNewName(newName) + .build()); + } catch (Exception e) { + LOG.warn("atomicRenameEntry {}/{} => {}/{}: {}", oldParent, oldName, newParent, newName, e); + return false; + } + return true; + } + + private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) { + if (entry.getChunksList().size() <= 0) { + return entry; + } + String fileId = entry.getChunks(0).getFileId(); + if (fileId != null && fileId.length() != 0) { + return entry; + } + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + entryBuilder.clearChunks(); + for (FilerProto.FileChunk chunk : entry.getChunksList()) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + FilerProto.FileId fid = chunk.getFid(); + fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + chunkBuilder.setFileId(fileId); + entryBuilder.addChunks(chunkBuilder); + } + return entryBuilder.build(); + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 16b7c3249..c28c1dcf2 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -2,7 +2,14 @@ package seaweedfs.client; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import javax.net.ssl.SSLException; +import java.io.File; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -20,6 +27,16 @@ public class FilerGrpcClient { this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()); } + public FilerGrpcClient(String host, int grpcPort, + String caFilePath, + String clientCertFilePath, + String clientPrivateKeyFilePath) throws SSLException { + + this(NettyChannelBuilder.forAddress(host, grpcPort) + .negotiationType(NegotiationType.TLS) + .sslContext(buildSslContext(caFilePath,clientCertFilePath,clientPrivateKeyFilePath))); + } + public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { channel = channelBuilder.build(); blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); @@ -42,4 +59,18 @@ public class FilerGrpcClient { public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { return futureStub; } + + private static SslContext buildSslContext(String trustCertCollectionFilePath, + String clientCertChainFilePath, + String clientPrivateKeyFilePath) throws SSLException { + SslContextBuilder builder = GrpcSslContexts.forClient(); + if (trustCertCollectionFilePath != null) { + builder.trustManager(new File(trustCertCollectionFilePath)); + } + if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) { + builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath)); + } + return builder.build(); + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index a7cede09f..15db87195 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -20,25 +20,26 @@ public class SeaweedWrite { final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( - FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) - .setDataCenter("") - .setReplication("") - .setTtlSec(0) - .build()); + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection("") + .setReplication(replication) + .setDataCenter("") + .setReplication("") + .setTtlSec(0) + .build()); String fileId = response.getFileId(); String url = response.getUrl(); + String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength); + String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength); entry.addChunks(FilerProto.FileChunk.newBuilder() - .setFileId(fileId) - .setOffset(offset) - .setSize(bytesLength) - .setMtime(System.currentTimeMillis() / 10000L) - .setETag(etag) + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) ); } @@ -46,14 +47,15 @@ public class SeaweedWrite { public static void writeMeta(final FilerGrpcClient filerGrpcClient, final String parentDirectory, final FilerProto.Entry.Builder entry) { filerGrpcClient.getBlockingStub().createEntry( - FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parentDirectory) - .setEntry(entry) - .build() + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parentDirectory) + .setEntry(entry) + .build() ); } private static String multipartUpload(String targetUrl, + String auth, final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { @@ -62,11 +64,14 @@ public class SeaweedWrite { InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); HttpPost post = new HttpPost(targetUrl); + if (auth != null && auth.length() != 0) { + post.addHeader("Authorization", "BEARER " + auth); + } post.setEntity(MultipartEntityBuilder.create() - .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) - .addBinaryBody("upload", inputStream) - .build()); + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("upload", inputStream) + .build()); try { HttpResponse response = client.execute(post); diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 6cd4df6b4..d72bced12 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -24,6 +24,9 @@ service SeaweedFiler { rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } + rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { + } + rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) { } @@ -36,6 +39,9 @@ service SeaweedFiler { rpc Statistics (StatisticsRequest) returns (StatisticsResponse) { } + rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { + } + } ////////////////////////////////////////////////// @@ -69,19 +75,33 @@ message Entry { map<string, bytes> extended = 5; } +message FullEntry { + string dir = 1; + Entry entry = 2; +} + message EventNotification { Entry old_entry = 1; Entry new_entry = 2; bool delete_chunks = 3; + string new_parent_path = 4; } message FileChunk { - string file_id = 1; + string file_id = 1; // to be deprecated int64 offset = 2; uint64 size = 3; int64 mtime = 4; string e_tag = 5; - string source_file_id = 6; + string source_file_id = 6; // to be deprecated + FileId fid = 7; + FileId source_fid = 8; +} + +message FileId { + uint32 volume_id = 1; + uint64 file_key = 2; + fixed32 cookie = 3; } message FuseAttributes { @@ -126,6 +146,16 @@ message DeleteEntryRequest { message DeleteEntryResponse { } +message AtomicRenameEntryRequest { + string old_directory = 1; + string old_name = 2; + string new_directory = 3; + string new_name = 4; +} + +message AtomicRenameEntryResponse { +} + message AssignVolumeRequest { int32 count = 1; string collection = 2; @@ -139,6 +169,7 @@ message AssignVolumeResponse { string url = 2; string public_url = 3; int32 count = 4; + string auth = 5; } message LookupVolumeRequest { @@ -177,3 +208,12 @@ message StatisticsResponse { uint64 used_size = 5; uint64 file_count = 6; } + +message GetFilerConfigurationRequest { +} +message GetFilerConfigurationResponse { + repeated string masters = 1; + string replication = 2; + string collection = 3; + uint32 max_mb = 4; +} diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml index a0cab8752..6a1cd897f 100644 --- a/other/java/hdfs/pom.xml +++ b/other/java/hdfs/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.0.5</seaweedfs.client.version> + <seaweedfs.client.version>1.1.0</seaweedfs.client.version> <hadoop.version>3.1.1</hadoop.version> </properties> diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 2a0ef78af..453924cf7 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -34,6 +34,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final String FS_SEAWEED_GRPC_CA = "fs.seaweed.ca"; + public static final String FS_SEAWEED_GRPC_CLIENT_KEY = "fs.seaweed.client.key"; + public static final String FS_SEAWEED_GRPC_CLIENT_CERT = "fs.seaweed.client.cert"; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); private static int BUFFER_SIZE = 16 * 1024 * 1024; @@ -72,7 +75,17 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { setConf(conf); this.uri = uri; - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); + if (conf.get(FS_SEAWEED_GRPC_CA) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CA).length() != 0 + && conf.get(FS_SEAWEED_GRPC_CLIENT_CERT) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_CERT).length() != 0 + && conf.get(FS_SEAWEED_GRPC_CLIENT_KEY) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_KEY).length() != 0) { + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, + conf.get(FS_SEAWEED_GRPC_CA), + conf.get(FS_SEAWEED_GRPC_CLIENT_CERT), + conf.get(FS_SEAWEED_GRPC_CLIENT_KEY)); + } else { + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); + } + } @Override @@ -206,8 +219,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return seaweedFileSystemStore.createDirectory(path, currentUser, - fsPermission == null ? FsPermission.getDirDefault() : fsPermission, - FsPermission.getUMask(getConf())); + fsPermission == null ? FsPermission.getDirDefault() : fsPermission, + FsPermission.getUMask(getConf())); } @@ -238,7 +251,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { */ @Override public void setOwner(Path path, final String owner, final String group) - throws IOException { + throws IOException { LOG.debug("setOwner path: {}", path); path = qualify(path); @@ -271,54 +284,55 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { /** * Concat existing files together. - * @param trg the path to the target destination. + * + * @param trg the path to the target destination. * @param psrcs the paths to the sources to use for the concatenation. - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default). + * (default). */ @Override - public void concat(final Path trg, final Path [] psrcs) throws IOException { + public void concat(final Path trg, final Path[] psrcs) throws IOException { throw new UnsupportedOperationException("Not implemented by the " + - getClass().getSimpleName() + " FileSystem implementation"); + getClass().getSimpleName() + " FileSystem implementation"); } /** * Truncate the file in the indicated path to the indicated size. * <ul> - * <li>Fails if path is a directory.</li> - * <li>Fails if path does not exist.</li> - * <li>Fails if path is not closed.</li> - * <li>Fails if new size is greater than current size.</li> + * <li>Fails if path is a directory.</li> + * <li>Fails if path does not exist.</li> + * <li>Fails if path is not closed.</li> + * <li>Fails if new size is greater than current size.</li> * </ul> - * @param f The path to the file to be truncated - * @param newLength The size the file is to be truncated to * + * @param f The path to the file to be truncated + * @param newLength The size the file is to be truncated to * @return <code>true</code> if the file has been truncated to the desired * <code>newLength</code> and is immediately available to be reused for * write operations such as <code>append</code>, or * <code>false</code> if a background process of adjusting the length of * the last block has been started, and clients should wait for it to * complete before proceeding with further file updates. - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default). + * (default). */ @Override public boolean truncate(Path f, long newLength) throws IOException { throw new UnsupportedOperationException("Not implemented by the " + - getClass().getSimpleName() + " FileSystem implementation"); + getClass().getSimpleName() + " FileSystem implementation"); } @Override public void createSymlink(final Path target, final Path link, final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, - IOException { + FileAlreadyExistsException, FileNotFoundException, + ParentNotDirectoryException, UnsupportedFileSystemException, + IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( - "Filesystem does not support symlinks!"); + "Filesystem does not support symlinks!"); } public boolean supportsSymlinks() { @@ -327,48 +341,51 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { /** * Create a snapshot. - * @param path The directory where snapshots will be taken. + * + * @param path The directory where snapshots will be taken. * @param snapshotName The name of the snapshot * @return the snapshot path. - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported */ @Override public Path createSnapshot(Path path, String snapshotName) - throws IOException { + throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support createSnapshot"); + + " doesn't support createSnapshot"); } /** * Rename a snapshot. - * @param path The directory path where the snapshot was taken + * + * @param path The directory path where the snapshot was taken * @param snapshotOldName Old name of the snapshot * @param snapshotNewName New name of the snapshot - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void renameSnapshot(Path path, String snapshotOldName, String snapshotNewName) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support renameSnapshot"); + + " doesn't support renameSnapshot"); } /** * Delete a snapshot of a directory. - * @param path The directory that the to-be-deleted snapshot belongs to + * + * @param path The directory that the to-be-deleted snapshot belongs to * @param snapshotName The name of the snapshot - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void deleteSnapshot(Path path, String snapshotName) - throws IOException { + throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support deleteSnapshot"); + + " doesn't support deleteSnapshot"); } /** @@ -377,49 +394,49 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * ACL entries that are not specified in this call are retained without * changes. (Modifications are merged into the current ACL.) * - * @param path Path to modify + * @param path Path to modify * @param aclSpec List<AclEntry> describing modifications - * @throws IOException if an ACL could not be modified + * @throws IOException if an ACL could not be modified * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void modifyAclEntries(Path path, List<AclEntry> aclSpec) - throws IOException { + throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support modifyAclEntries"); + + " doesn't support modifyAclEntries"); } /** * Removes ACL entries from files and directories. Other ACL entries are * retained. * - * @param path Path to modify + * @param path Path to modify * @param aclSpec List describing entries to remove - * @throws IOException if an ACL could not be modified + * @throws IOException if an ACL could not be modified * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void removeAclEntries(Path path, List<AclEntry> aclSpec) - throws IOException { + throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeAclEntries"); + + " doesn't support removeAclEntries"); } /** * Removes all default ACL entries from files and directories. * * @param path Path to modify - * @throws IOException if an ACL could not be modified + * @throws IOException if an ACL could not be modified * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void removeDefaultAcl(Path path) - throws IOException { + throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeDefaultAcl"); + + " doesn't support removeDefaultAcl"); } /** @@ -428,32 +445,32 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * bits. * * @param path Path to modify - * @throws IOException if an ACL could not be removed + * @throws IOException if an ACL could not be removed * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void removeAcl(Path path) - throws IOException { + throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeAcl"); + + " doesn't support removeAcl"); } /** * Fully replaces ACL of files and directories, discarding all existing * entries. * - * @param path Path to modify + * @param path Path to modify * @param aclSpec List describing modifications, which must include entries - * for user, group, and others for compatibility with permission bits. - * @throws IOException if an ACL could not be modified + * for user, group, and others for compatibility with permission bits. + * @throws IOException if an ACL could not be modified * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support setAcl"); + + " doesn't support setAcl"); } /** @@ -461,14 +478,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * * @param path Path to get * @return AclStatus describing the ACL of the file or directory - * @throws IOException if an ACL could not be read + * @throws IOException if an ACL could not be read * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public AclStatus getAclStatus(Path path) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getAclStatus"); + + " doesn't support getAclStatus"); } /** @@ -478,19 +495,19 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * <p> * Refer to the HDFS extended attributes user documentation for details. * - * @param path Path to modify - * @param name xattr name. + * @param path Path to modify + * @param name xattr name. * @param value xattr value. - * @param flag xattr set flag - * @throws IOException IO failure + * @param flag xattr set flag + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void setXAttr(Path path, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support setXAttr"); + + " doesn't support setXAttr"); } /** @@ -503,14 +520,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * @param path Path to get extended attribute * @param name xattr name. * @return byte[] xattr value. - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public byte[] getXAttr(Path path, String name) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttr"); + + " doesn't support getXAttr"); } /** @@ -522,14 +539,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * * @param path Path to get extended attributes * @return Map describing the XAttrs of the file or directory - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public Map<String, byte[]> getXAttrs(Path path) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttrs"); + + " doesn't support getXAttrs"); } /** @@ -539,18 +556,18 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * <p> * Refer to the HDFS extended attributes user documentation for details. * - * @param path Path to get extended attributes + * @param path Path to get extended attributes * @param names XAttr names. * @return Map describing the XAttrs of the file or directory - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public Map<String, byte[]> getXAttrs(Path path, List<String> names) - throws IOException { + throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttrs"); + + " doesn't support getXAttrs"); } /** @@ -562,14 +579,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * * @param path Path to get extended attributes * @return List{@literal <String>} of the XAttr names of the file or directory - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public List<String> listXAttrs(Path path) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support listXAttrs"); + + " doesn't support listXAttrs"); } /** @@ -581,14 +598,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { * * @param path Path to remove extended attribute * @param name xattr name - * @throws IOException IO failure + * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). + * (default outcome). */ @Override public void removeXAttr(Path path, String name) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeXAttr"); + + " doesn't support removeXAttr"); } } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 27678e615..643467898 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -12,6 +12,7 @@ import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedRead; +import javax.net.ssl.SSLException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -33,6 +34,13 @@ public class SeaweedFileSystemStore { filerClient = new FilerClient(filerGrpcClient); } + public SeaweedFileSystemStore(String host, int port, + String caFile, String clientCertFile, String clientKeyFile) throws SSLException { + int grpcPort = 10000 + port; + filerGrpcClient = new FilerGrpcClient(host, grpcPort, caFile, clientCertFile, clientKeyFile); + filerClient = new FilerClient(filerGrpcClient); + } + public static String getParentDirectory(Path path) { return path.isRoot() ? "/" : path.getParent().toUri().getPath(); } @@ -143,35 +151,7 @@ public class SeaweedFileSystemStore { LOG.warn("rename non-existing source: {}", source); return; } - LOG.warn("rename moveEntry source: {}", source); - moveEntry(source.getParent(), entry, destination); - } - - private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) { - - LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination); - - FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName()); - boolean isDirectoryCreated = filerClient.createEntry(getParentDirectory(destination), newEntry.build()); - - if (!isDirectoryCreated) { - return false; - } - - if (entry.getIsDirectory()) { - Path entryPath = new Path(oldParent, entry.getName()); - List<FilerProto.Entry> entries = filerClient.listEntries(entryPath.toUri().getPath()); - for (FilerProto.Entry ent : entries) { - boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName())); - if (!isSucess) { - return false; - } - } - } - - return filerClient.deleteEntry( - oldParent.toUri().getPath(), entry.getName(), false, false); - + filerClient.mv(source.toUri().getPath(), destination.toUri().getPath()); } public OutputStream createFile(final Path path, |
