aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2019-07-16 11:13:23 +0800
committerGitHub <noreply@github.com>2019-07-16 11:13:23 +0800
commitd19bbee98d89ec6cd603572bd9c5d55749610e61 (patch)
tree8d760dcee4dfcb4404af90b7d5e64def4549b4cc /other/java
parent01060c992591f412b0d5e180bde29991747a9462 (diff)
parent5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff)
downloadseaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz
seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip
keep update from original (#1)
keep update from original
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml2
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java171
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java31
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java45
-rw-r--r--other/java/client/src/main/proto/filer.proto44
-rw-r--r--other/java/hdfs/pom.xml2
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java185
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java38
8 files changed, 323 insertions, 195 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 1ea39863f..5882c726d 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -4,7 +4,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.0.5</version>
+ <version>1.1.0</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 63d0d8320..f4bd0944b 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -51,12 +51,26 @@ public class FilerClient {
}
return createEntry(
- parent,
- newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build()
+ parent,
+ newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build()
);
}
+ public boolean mv(String oldPath, String newPath) {
+
+ Path oldPathObject = Paths.get(oldPath);
+ String oldParent = oldPathObject.getParent().toString();
+ String oldName = oldPathObject.getFileName().toString();
+
+ Path newPathObject = Paths.get(newPath);
+ String newParent = newPathObject.getParent().toString();
+ String newName = newPathObject.getFileName().toString();
+
+ return atomicRenameEntry(oldParent, oldName, newParent, newName);
+
+ }
+
public boolean rm(String path, boolean isRecursive) {
Path pathObject = Paths.get(path);
@@ -64,10 +78,10 @@ public class FilerClient {
String name = pathObject.getFileName().toString();
return deleteEntry(
- parent,
- name,
- true,
- isRecursive);
+ parent,
+ name,
+ true,
+ isRecursive);
}
public boolean touch(String path, int mode) {
@@ -84,18 +98,18 @@ public class FilerClient {
FilerProto.Entry entry = lookupEntry(parent, name);
if (entry == null) {
return createEntry(
- parent,
- newFileEntry(name, mode, uid, gid, userName, groupNames).build()
+ parent,
+ newFileEntry(name, mode, uid, gid, userName, groupNames).build()
);
}
long now = System.currentTimeMillis() / 1000L;
FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder()
- .setMtime(now)
- .setUid(uid)
- .setGid(gid)
- .setUserName(userName)
- .clearGroupName()
- .addAllGroupName(Arrays.asList(groupNames));
+ .setMtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames));
return updateEntry(parent, entry.toBuilder().setAttributes(attr).build());
}
@@ -105,17 +119,17 @@ public class FilerClient {
long now = System.currentTimeMillis() / 1000L;
return FilerProto.Entry.newBuilder()
- .setName(name)
- .setIsDirectory(true)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setMtime(now)
- .setCrtime(now)
- .setUid(uid)
- .setGid(gid)
- .setFileMode(mode | 1 << 31)
- .setUserName(userName)
- .clearGroupName()
- .addAllGroupName(Arrays.asList(groupNames)));
+ .setName(name)
+ .setIsDirectory(true)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setFileMode(mode | 1 << 31)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames)));
}
public FilerProto.Entry.Builder newFileEntry(String name, int mode,
@@ -124,17 +138,17 @@ public class FilerClient {
long now = System.currentTimeMillis() / 1000L;
return FilerProto.Entry.newBuilder()
- .setName(name)
- .setIsDirectory(false)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setMtime(now)
- .setCrtime(now)
- .setUid(uid)
- .setGid(gid)
- .setFileMode(mode)
- .setUserName(userName)
- .clearGroupName()
- .addAllGroupName(Arrays.asList(groupNames)));
+ .setName(name)
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setFileMode(mode)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames)));
}
public List<FilerProto.Entry> listEntries(String path) {
@@ -159,21 +173,27 @@ public class FilerClient {
}
public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
- return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
- .setDirectory(path)
- .setPrefix(entryPrefix)
- .setStartFromFileName(lastEntryName)
- .setLimit(limit)
- .build()).getEntriesList();
+ List<FilerProto.Entry> entries = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ .setDirectory(path)
+ .setPrefix(entryPrefix)
+ .setStartFromFileName(lastEntryName)
+ .setLimit(limit)
+ .build()).getEntriesList();
+ List<FilerProto.Entry> fixedEntries = new ArrayList<>(entries.size());
+ for (FilerProto.Entry entry : entries) {
+ fixedEntries.add(fixEntryAfterReading(entry));
+ }
+ return fixedEntries;
}
public FilerProto.Entry lookupEntry(String directory, String entryName) {
try {
- return filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
- FilerProto.LookupDirectoryEntryRequest.newBuilder()
- .setDirectory(directory)
- .setName(entryName)
- .build()).getEntry();
+ FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
+ FilerProto.LookupDirectoryEntryRequest.newBuilder()
+ .setDirectory(directory)
+ .setName(entryName)
+ .build()).getEntry();
+ return fixEntryAfterReading(entry);
} catch (Exception e) {
LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);
return null;
@@ -184,9 +204,9 @@ public class FilerClient {
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
} catch (Exception e) {
LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
return false;
@@ -197,9 +217,9 @@ public class FilerClient {
public boolean updateEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
} catch (Exception e) {
LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
return false;
@@ -210,11 +230,11 @@ public class FilerClient {
public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) {
try {
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
- .setDirectory(parent)
- .setName(entryName)
- .setIsDeleteData(isDeleteFileChunk)
- .setIsRecursive(isRecursive)
- .build());
+ .setDirectory(parent)
+ .setName(entryName)
+ .setIsDeleteData(isDeleteFileChunk)
+ .setIsRecursive(isRecursive)
+ .build());
} catch (Exception e) {
LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e);
return false;
@@ -222,4 +242,39 @@ public class FilerClient {
return true;
}
+ public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) {
+ try {
+ filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
+ .setOldDirectory(oldParent)
+ .setOldName(oldName)
+ .setNewDirectory(newParent)
+ .setNewName(newName)
+ .build());
+ } catch (Exception e) {
+ LOG.warn("atomicRenameEntry {}/{} => {}/{}: {}", oldParent, oldName, newParent, newName, e);
+ return false;
+ }
+ return true;
+ }
+
+ private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) {
+ if (entry.getChunksList().size() <= 0) {
+ return entry;
+ }
+ String fileId = entry.getChunks(0).getFileId();
+ if (fileId != null && fileId.length() != 0) {
+ return entry;
+ }
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ entryBuilder.clearChunks();
+ for (FilerProto.FileChunk chunk : entry.getChunksList()) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ FilerProto.FileId fid = chunk.getFid();
+ fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ chunkBuilder.setFileId(fileId);
+ entryBuilder.addChunks(chunkBuilder);
+ }
+ return entryBuilder.build();
+ }
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 16b7c3249..c28c1dcf2 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -2,7 +2,14 @@ package seaweedfs.client;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
+import javax.net.ssl.SSLException;
+import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -20,6 +27,16 @@ public class FilerGrpcClient {
this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext());
}
+ public FilerGrpcClient(String host, int grpcPort,
+ String caFilePath,
+ String clientCertFilePath,
+ String clientPrivateKeyFilePath) throws SSLException {
+
+ this(NettyChannelBuilder.forAddress(host, grpcPort)
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(buildSslContext(caFilePath,clientCertFilePath,clientPrivateKeyFilePath)));
+ }
+
public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
@@ -42,4 +59,18 @@ public class FilerGrpcClient {
public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
return futureStub;
}
+
+ private static SslContext buildSslContext(String trustCertCollectionFilePath,
+ String clientCertChainFilePath,
+ String clientPrivateKeyFilePath) throws SSLException {
+ SslContextBuilder builder = GrpcSslContexts.forClient();
+ if (trustCertCollectionFilePath != null) {
+ builder.trustManager(new File(trustCertCollectionFilePath));
+ }
+ if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) {
+ builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath));
+ }
+ return builder.build();
+ }
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index a7cede09f..15db87195 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -20,25 +20,26 @@ public class SeaweedWrite {
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
- FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection("")
- .setReplication(replication)
- .setDataCenter("")
- .setReplication("")
- .setTtlSec(0)
- .build());
+ FilerProto.AssignVolumeRequest.newBuilder()
+ .setCollection("")
+ .setReplication(replication)
+ .setDataCenter("")
+ .setReplication("")
+ .setTtlSec(0)
+ .build());
String fileId = response.getFileId();
String url = response.getUrl();
+ String auth = response.getAuth();
String targetUrl = String.format("http://%s/%s", url, fileId);
- String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength);
+ String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength);
entry.addChunks(FilerProto.FileChunk.newBuilder()
- .setFileId(fileId)
- .setOffset(offset)
- .setSize(bytesLength)
- .setMtime(System.currentTimeMillis() / 10000L)
- .setETag(etag)
+ .setFileId(fileId)
+ .setOffset(offset)
+ .setSize(bytesLength)
+ .setMtime(System.currentTimeMillis() / 10000L)
+ .setETag(etag)
);
}
@@ -46,14 +47,15 @@ public class SeaweedWrite {
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory, final FilerProto.Entry.Builder entry) {
filerGrpcClient.getBlockingStub().createEntry(
- FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parentDirectory)
- .setEntry(entry)
- .build()
+ FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parentDirectory)
+ .setEntry(entry)
+ .build()
);
}
private static String multipartUpload(String targetUrl,
+ String auth,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
@@ -62,11 +64,14 @@ public class SeaweedWrite {
InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
HttpPost post = new HttpPost(targetUrl);
+ if (auth != null && auth.length() != 0) {
+ post.addHeader("Authorization", "BEARER " + auth);
+ }
post.setEntity(MultipartEntityBuilder.create()
- .setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
- .addBinaryBody("upload", inputStream)
- .build());
+ .setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+ .addBinaryBody("upload", inputStream)
+ .build());
try {
HttpResponse response = client.execute(post);
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 6cd4df6b4..d72bced12 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -24,6 +24,9 @@ service SeaweedFiler {
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
}
+ rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
+ }
+
rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) {
}
@@ -36,6 +39,9 @@ service SeaweedFiler {
rpc Statistics (StatisticsRequest) returns (StatisticsResponse) {
}
+ rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -69,19 +75,33 @@ message Entry {
map<string, bytes> extended = 5;
}
+message FullEntry {
+ string dir = 1;
+ Entry entry = 2;
+}
+
message EventNotification {
Entry old_entry = 1;
Entry new_entry = 2;
bool delete_chunks = 3;
+ string new_parent_path = 4;
}
message FileChunk {
- string file_id = 1;
+ string file_id = 1; // to be deprecated
int64 offset = 2;
uint64 size = 3;
int64 mtime = 4;
string e_tag = 5;
- string source_file_id = 6;
+ string source_file_id = 6; // to be deprecated
+ FileId fid = 7;
+ FileId source_fid = 8;
+}
+
+message FileId {
+ uint32 volume_id = 1;
+ uint64 file_key = 2;
+ fixed32 cookie = 3;
}
message FuseAttributes {
@@ -126,6 +146,16 @@ message DeleteEntryRequest {
message DeleteEntryResponse {
}
+message AtomicRenameEntryRequest {
+ string old_directory = 1;
+ string old_name = 2;
+ string new_directory = 3;
+ string new_name = 4;
+}
+
+message AtomicRenameEntryResponse {
+}
+
message AssignVolumeRequest {
int32 count = 1;
string collection = 2;
@@ -139,6 +169,7 @@ message AssignVolumeResponse {
string url = 2;
string public_url = 3;
int32 count = 4;
+ string auth = 5;
}
message LookupVolumeRequest {
@@ -177,3 +208,12 @@ message StatisticsResponse {
uint64 used_size = 5;
uint64 file_count = 6;
}
+
+message GetFilerConfigurationRequest {
+}
+message GetFilerConfigurationResponse {
+ repeated string masters = 1;
+ string replication = 2;
+ string collection = 3;
+ uint32 max_mb = 4;
+}
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
index a0cab8752..6a1cd897f 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.0.5</seaweedfs.client.version>
+ <seaweedfs.client.version>1.1.0</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 2a0ef78af..453924cf7 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -34,6 +34,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
+ public static final String FS_SEAWEED_GRPC_CA = "fs.seaweed.ca";
+ public static final String FS_SEAWEED_GRPC_CLIENT_KEY = "fs.seaweed.client.key";
+ public static final String FS_SEAWEED_GRPC_CLIENT_CERT = "fs.seaweed.client.cert";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
private static int BUFFER_SIZE = 16 * 1024 * 1024;
@@ -72,7 +75,17 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
setConf(conf);
this.uri = uri;
- seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
+ if (conf.get(FS_SEAWEED_GRPC_CA) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CA).length() != 0
+ && conf.get(FS_SEAWEED_GRPC_CLIENT_CERT) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_CERT).length() != 0
+ && conf.get(FS_SEAWEED_GRPC_CLIENT_KEY) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_KEY).length() != 0) {
+ seaweedFileSystemStore = new SeaweedFileSystemStore(host, port,
+ conf.get(FS_SEAWEED_GRPC_CA),
+ conf.get(FS_SEAWEED_GRPC_CLIENT_CERT),
+ conf.get(FS_SEAWEED_GRPC_CLIENT_KEY));
+ } else {
+ seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
+ }
+
}
@Override
@@ -206,8 +219,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
- fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
- FsPermission.getUMask(getConf()));
+ fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
+ FsPermission.getUMask(getConf()));
}
@@ -238,7 +251,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*/
@Override
public void setOwner(Path path, final String owner, final String group)
- throws IOException {
+ throws IOException {
LOG.debug("setOwner path: {}", path);
path = qualify(path);
@@ -271,54 +284,55 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/**
* Concat existing files together.
- * @param trg the path to the target destination.
+ *
+ * @param trg the path to the target destination.
* @param psrcs the paths to the sources to use for the concatenation.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default).
+ * (default).
*/
@Override
- public void concat(final Path trg, final Path [] psrcs) throws IOException {
+ public void concat(final Path trg, final Path[] psrcs) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
+ getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Truncate the file in the indicated path to the indicated size.
* <ul>
- * <li>Fails if path is a directory.</li>
- * <li>Fails if path does not exist.</li>
- * <li>Fails if path is not closed.</li>
- * <li>Fails if new size is greater than current size.</li>
+ * <li>Fails if path is a directory.</li>
+ * <li>Fails if path does not exist.</li>
+ * <li>Fails if path is not closed.</li>
+ * <li>Fails if new size is greater than current size.</li>
* </ul>
- * @param f The path to the file to be truncated
- * @param newLength The size the file is to be truncated to
*
+ * @param f The path to the file to be truncated
+ * @param newLength The size the file is to be truncated to
* @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default).
+ * (default).
*/
@Override
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
+ getClass().getSimpleName() + " FileSystem implementation");
}
@Override
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
- IOException {
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
- "Filesystem does not support symlinks!");
+ "Filesystem does not support symlinks!");
}
public boolean supportsSymlinks() {
@@ -327,48 +341,51 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/**
* Create a snapshot.
- * @param path The directory where snapshots will be taken.
+ *
+ * @param path The directory where snapshots will be taken.
* @param snapshotName The name of the snapshot
* @return the snapshot path.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
*/
@Override
public Path createSnapshot(Path path, String snapshotName)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support createSnapshot");
+ + " doesn't support createSnapshot");
}
/**
* Rename a snapshot.
- * @param path The directory path where the snapshot was taken
+ *
+ * @param path The directory path where the snapshot was taken
* @param snapshotOldName Old name of the snapshot
* @param snapshotNewName New name of the snapshot
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support renameSnapshot");
+ + " doesn't support renameSnapshot");
}
/**
* Delete a snapshot of a directory.
- * @param path The directory that the to-be-deleted snapshot belongs to
+ *
+ * @param path The directory that the to-be-deleted snapshot belongs to
* @param snapshotName The name of the snapshot
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void deleteSnapshot(Path path, String snapshotName)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support deleteSnapshot");
+ + " doesn't support deleteSnapshot");
}
/**
@@ -377,49 +394,49 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
*
- * @param path Path to modify
+ * @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
- * @throws IOException if an ACL could not be modified
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support modifyAclEntries");
+ + " doesn't support modifyAclEntries");
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
- * @param path Path to modify
+ * @param path Path to modify
* @param aclSpec List describing entries to remove
- * @throws IOException if an ACL could not be modified
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAclEntries");
+ + " doesn't support removeAclEntries");
}
/**
* Removes all default ACL entries from files and directories.
*
* @param path Path to modify
- * @throws IOException if an ACL could not be modified
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeDefaultAcl(Path path)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeDefaultAcl");
+ + " doesn't support removeDefaultAcl");
}
/**
@@ -428,32 +445,32 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* bits.
*
* @param path Path to modify
- * @throws IOException if an ACL could not be removed
+ * @throws IOException if an ACL could not be removed
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeAcl(Path path)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAcl");
+ + " doesn't support removeAcl");
}
/**
* Fully replaces ACL of files and directories, discarding all existing
* entries.
*
- * @param path Path to modify
+ * @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
- * for user, group, and others for compatibility with permission bits.
- * @throws IOException if an ACL could not be modified
+ * for user, group, and others for compatibility with permission bits.
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setAcl");
+ + " doesn't support setAcl");
}
/**
@@ -461,14 +478,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to get
* @return AclStatus describing the ACL of the file or directory
- * @throws IOException if an ACL could not be read
+ * @throws IOException if an ACL could not be read
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public AclStatus getAclStatus(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getAclStatus");
+ + " doesn't support getAclStatus");
}
/**
@@ -478,19 +495,19 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
- * @param path Path to modify
- * @param name xattr name.
+ * @param path Path to modify
+ * @param name xattr name.
* @param value xattr value.
- * @param flag xattr set flag
- * @throws IOException IO failure
+ * @param flag xattr set flag
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setXAttr");
+ + " doesn't support setXAttr");
}
/**
@@ -503,14 +520,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* @param path Path to get extended attribute
* @param name xattr name.
* @return byte[] xattr value.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttr");
+ + " doesn't support getXAttr");
}
/**
@@ -522,14 +539,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to get extended attributes
* @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
+ + " doesn't support getXAttrs");
}
/**
@@ -539,18 +556,18 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
- * @param path Path to get extended attributes
+ * @param path Path to get extended attributes
* @param names XAttr names.
* @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
+ + " doesn't support getXAttrs");
}
/**
@@ -562,14 +579,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to get extended attributes
* @return List{@literal <String>} of the XAttr names of the file or directory
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public List<String> listXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support listXAttrs");
+ + " doesn't support listXAttrs");
}
/**
@@ -581,14 +598,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to remove extended attribute
* @param name xattr name
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeXAttr");
+ + " doesn't support removeXAttr");
}
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 27678e615..643467898 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -12,6 +12,7 @@ import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedRead;
+import javax.net.ssl.SSLException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -33,6 +34,13 @@ public class SeaweedFileSystemStore {
filerClient = new FilerClient(filerGrpcClient);
}
+ public SeaweedFileSystemStore(String host, int port,
+ String caFile, String clientCertFile, String clientKeyFile) throws SSLException {
+ int grpcPort = 10000 + port;
+ filerGrpcClient = new FilerGrpcClient(host, grpcPort, caFile, clientCertFile, clientKeyFile);
+ filerClient = new FilerClient(filerGrpcClient);
+ }
+
public static String getParentDirectory(Path path) {
return path.isRoot() ? "/" : path.getParent().toUri().getPath();
}
@@ -143,35 +151,7 @@ public class SeaweedFileSystemStore {
LOG.warn("rename non-existing source: {}", source);
return;
}
- LOG.warn("rename moveEntry source: {}", source);
- moveEntry(source.getParent(), entry, destination);
- }
-
- private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) {
-
- LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination);
-
- FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
- boolean isDirectoryCreated = filerClient.createEntry(getParentDirectory(destination), newEntry.build());
-
- if (!isDirectoryCreated) {
- return false;
- }
-
- if (entry.getIsDirectory()) {
- Path entryPath = new Path(oldParent, entry.getName());
- List<FilerProto.Entry> entries = filerClient.listEntries(entryPath.toUri().getPath());
- for (FilerProto.Entry ent : entries) {
- boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName()));
- if (!isSucess) {
- return false;
- }
- }
- }
-
- return filerClient.deleteEntry(
- oldParent.toUri().getPath(), entry.getName(), false, false);
-
+ filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
}
public OutputStream createFile(final Path path,