aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml7
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java207
-rw-r--r--other/java/client/src/main/proto/filer.proto2
-rw-r--r--other/java/hdfs/pom.xml24
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java339
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java107
6 files changed, 606 insertions, 80 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 74404823b..a3c56856e 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -4,7 +4,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.0</version>
+ <version>1.0.2</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -45,6 +45,11 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
</dependencies>
<distributionManagement>
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
new file mode 100644
index 000000000..8414ee303
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -0,0 +1,207 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+
+public class FilerClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
+
+ private FilerGrpcClient filerGrpcClient;
+
+ public FilerClient(String host, int grpcPort) {
+ filerGrpcClient = new FilerGrpcClient(host, grpcPort);
+ }
+
+ public FilerClient(FilerGrpcClient filerGrpcClient) {
+ this.filerGrpcClient = filerGrpcClient;
+ }
+
+ public boolean mkdirs(String path, int mode) {
+ String currentUser = System.getProperty("user.name");
+ return mkdirs(path, mode, 0, 0, currentUser, new String[]{});
+ }
+
+ public boolean mkdirs(String path, int mode, String userName, String[] groupNames) {
+ return mkdirs(path, mode, 0, 0, userName, groupNames);
+ }
+
+ public boolean mkdirs(String path, int mode, int uid, int gid, String userName, String[] groupNames) {
+
+ Path pathObject = Paths.get(path);
+ String parent = pathObject.getParent().toString();
+ String name = pathObject.getFileName().toString();
+
+ if ("/".equals(parent)) {
+ return true;
+ }
+
+ mkdirs(parent, mode, uid, gid, userName, groupNames);
+
+ FilerProto.Entry existingEntry = lookupEntry(parent, name);
+
+ if (existingEntry != null) {
+ return true;
+ }
+
+ return createEntry(
+ parent,
+ newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build()
+ );
+
+ }
+
+ public boolean rm(String path, boolean isRecursive) {
+
+ Path pathObject = Paths.get(path);
+ String parent = pathObject.getParent().toString();
+ String name = pathObject.getFileName().toString();
+
+ return deleteEntry(
+ parent,
+ name,
+ true,
+ isRecursive);
+ }
+
+ public boolean touch(String path, int mode) {
+ String currentUser = System.getProperty("user.name");
+ return touch(path, mode, 0, 0, currentUser, new String[]{});
+ }
+
+ public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) {
+
+ Path pathObject = Paths.get(path);
+ String parent = pathObject.getParent().toString();
+ String name = pathObject.getFileName().toString();
+
+ FilerProto.Entry entry = lookupEntry(parent, name);
+ if (entry == null) {
+ return createEntry(
+ parent,
+ newFileEntry(name, mode, uid, gid, userName, groupNames).build()
+ );
+ }
+ long now = System.currentTimeMillis() / 1000L;
+ FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder()
+ .setMtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames));
+ return updateEntry(parent, entry.toBuilder().setAttributes(attr).build());
+ }
+
+ public FilerProto.Entry.Builder newDirectoryEntry(String name, int mode,
+ int uid, int gid, String userName, String[] groupNames) {
+
+ long now = System.currentTimeMillis() / 1000L;
+
+ return FilerProto.Entry.newBuilder()
+ .setName(name)
+ .setIsDirectory(true)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setFileMode(mode | 1 << 31)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames)));
+ }
+
+ public FilerProto.Entry.Builder newFileEntry(String name, int mode,
+ int uid, int gid, String userName, String[] groupNames) {
+
+ long now = System.currentTimeMillis() / 1000L;
+
+ return FilerProto.Entry.newBuilder()
+ .setName(name)
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setFileMode(mode)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames)));
+ }
+
+ public List<FilerProto.Entry> listEntries(String path) {
+ return listEntries(path, "", "", 100000);
+ }
+
+ public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
+ return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ .setDirectory(path)
+ .setPrefix(entryPrefix)
+ .setStartFromFileName(lastEntryName)
+ .setLimit(limit)
+ .build()).getEntriesList();
+ }
+
+ public FilerProto.Entry lookupEntry(String directory, String entryName) {
+ try {
+ return filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
+ FilerProto.LookupDirectoryEntryRequest.newBuilder()
+ .setDirectory(directory)
+ .setName(entryName)
+ .build()).getEntry();
+ } catch (Exception e) {
+ LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);
+ return null;
+ }
+ }
+
+
+ public boolean createEntry(String parent, FilerProto.Entry entry) {
+ try {
+ filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
+ } catch (Exception e) {
+ LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
+ return false;
+ }
+ return true;
+ }
+
+ public boolean updateEntry(String parent, FilerProto.Entry entry) {
+ try {
+ filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
+ } catch (Exception e) {
+ LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
+ return false;
+ }
+ return true;
+ }
+
+ public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) {
+ try {
+ filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
+ .setDirectory(parent)
+ .setName(entryName)
+ .setIsDeleteData(isDeleteFileChunk)
+ .setIsRecursive(isRecursive)
+ .build());
+ } catch (Exception e) {
+ LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 124eabcd2..bb33eb48e 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -117,7 +117,7 @@ message UpdateEntryResponse {
message DeleteEntryRequest {
string directory = 1;
string name = 2;
- bool is_directory = 3;
+ // bool is_directory = 3;
bool is_delete_data = 4;
bool is_recursive = 5;
}
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
index 522d39aa2..53d6746fa 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop-client</artifactId>
- <version>1.0</version>
+ <version>1.0.2</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -41,6 +41,18 @@
<goal>shade</goal>
</goals>
<configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>org/slf4j/**</exclude>
+ <exclude>META-INF/maven/org.slf4j/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
@@ -54,6 +66,14 @@
<pattern>io.grpc.internal</pattern>
<shadedPattern>shaded.io.grpc.internal</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>shaded.org.apache.commons</shadedPattern>
+ <excludes>
+ <exclude>org.apache.hadoop</exclude>
+ <exclude>org.apache.log4j</exclude>
+ </excludes>
+ </relocation>
</relocations>
</configuration>
</execution>
@@ -126,7 +146,7 @@
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.0</version>
+ <version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index dce642d09..7d992a7a1 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -5,17 +5,27 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -259,4 +269,333 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
return path.makeQualified(uri, workingDirectory);
}
+ /**
+ * Concat existing files together.
+ * @param trg the path to the target destination.
+ * @param psrcs the paths to the sources to use for the concatenation.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ @Override
+ public void concat(final Path trg, final Path [] psrcs) throws IOException {
+ throw new UnsupportedOperationException("Not implemented by the " +
+ getClass().getSimpleName() + " FileSystem implementation");
+ }
+
+ /**
+ * Truncate the file in the indicated path to the indicated size.
+ * <ul>
+ * <li>Fails if path is a directory.</li>
+ * <li>Fails if path does not exist.</li>
+ * <li>Fails if path is not closed.</li>
+ * <li>Fails if new size is greater than current size.</li>
+ * </ul>
+ * @param f The path to the file to be truncated
+ * @param newLength The size the file is to be truncated to
+ *
+ * @return <code>true</code> if the file has been truncated to the desired
+ * <code>newLength</code> and is immediately available to be reused for
+ * write operations such as <code>append</code>, or
+ * <code>false</code> if a background process of adjusting the length of
+ * the last block has been started, and clients should wait for it to
+ * complete before proceeding with further file updates.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ @Override
+ public boolean truncate(Path f, long newLength) throws IOException {
+ throw new UnsupportedOperationException("Not implemented by the " +
+ getClass().getSimpleName() + " FileSystem implementation");
+ }
+
+ /**
+ * See {@link FileContext#createSymlink(Path, Path, boolean)}.
+ */
+ @Override
+ public void createSymlink(final Path target, final Path link,
+ final boolean createParent) throws AccessControlException,
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException {
+ // Supporting filesystems should override this method
+ throw new UnsupportedOperationException(
+ "Filesystem does not support symlinks!");
+ }
+
+ /**
+ * See {@link AbstractFileSystem#supportsSymlinks()}.
+ */
+ @Override
+ public boolean supportsSymlinks() {
+ return false;
+ }
+
+ /**
+ * Create a snapshot.
+ * @param path The directory where snapshots will be taken.
+ * @param snapshotName The name of the snapshot
+ * @return the snapshot path.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ */
+ @Override
+ public Path createSnapshot(Path path, String snapshotName)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support createSnapshot");
+ }
+
+ /**
+ * Rename a snapshot.
+ * @param path The directory path where the snapshot was taken
+ * @param snapshotOldName Old name of the snapshot
+ * @param snapshotNewName New name of the snapshot
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void renameSnapshot(Path path, String snapshotOldName,
+ String snapshotNewName) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support renameSnapshot");
+ }
+
+ /**
+ * Delete a snapshot of a directory.
+ * @param path The directory that the to-be-deleted snapshot belongs to
+ * @param snapshotName The name of the snapshot
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void deleteSnapshot(Path path, String snapshotName)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support deleteSnapshot");
+ }
+
+ /**
+ * Modifies ACL entries of files and directories. This method can add new ACL
+ * entries or modify the permissions on existing ACL entries. All existing
+ * ACL entries that are not specified in this call are retained without
+ * changes. (Modifications are merged into the current ACL.)
+ *
+ * @param path Path to modify
+ * @param aclSpec List&lt;AclEntry&gt; describing modifications
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support modifyAclEntries");
+ }
+
+ /**
+ * Removes ACL entries from files and directories. Other ACL entries are
+ * retained.
+ *
+ * @param path Path to modify
+ * @param aclSpec List describing entries to remove
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeAclEntries");
+ }
+
+ /**
+ * Removes all default ACL entries from files and directories.
+ *
+ * @param path Path to modify
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeDefaultAcl(Path path)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeDefaultAcl");
+ }
+
+ /**
+ * Removes all but the base ACL entries of files and directories. The entries
+ * for user, group, and others are retained for compatibility with permission
+ * bits.
+ *
+ * @param path Path to modify
+ * @throws IOException if an ACL could not be removed
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeAcl(Path path)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeAcl");
+ }
+
+ /**
+ * Fully replaces ACL of files and directories, discarding all existing
+ * entries.
+ *
+ * @param path Path to modify
+ * @param aclSpec List describing modifications, which must include entries
+ * for user, group, and others for compatibility with permission bits.
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support setAcl");
+ }
+
+ /**
+ * Gets the ACL of a file or directory.
+ *
+ * @param path Path to get
+ * @return AclStatus describing the ACL of the file or directory
+ * @throws IOException if an ACL could not be read
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public AclStatus getAclStatus(Path path) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getAclStatus");
+ }
+
+ /**
+ * Set an xattr of a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to modify
+ * @param name xattr name.
+ * @param value xattr value.
+ * @param flag xattr set flag
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void setXAttr(Path path, String name, byte[] value,
+ EnumSet<XAttrSetFlag> flag) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support setXAttr");
+ }
+
+ /**
+ * Get an xattr name and value for a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attribute
+ * @param name xattr name.
+ * @return byte[] xattr value.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public byte[] getXAttr(Path path, String name) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttr");
+ }
+
+ /**
+ * Get all of the xattr name/value pairs for a file or directory.
+ * Only those xattrs which the logged-in user has permissions to view
+ * are returned.
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttrs");
+ }
+
+ /**
+ * Get all of the xattrs name/value pairs for a file or directory.
+ * Only those xattrs which the logged-in user has permissions to view
+ * are returned.
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @param names XAttr names.
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttrs");
+ }
+
+ /**
+ * Get all of the xattr names for a file or directory.
+ * Only those xattr names which the logged-in user has permissions to view
+ * are returned.
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @return List{@literal <String>} of the XAttr names of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public List<String> listXAttrs(Path path) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support listXAttrs");
+ }
+
+ /**
+ * Remove an xattr of a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to remove extended attribute
+ * @param name xattr name
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeXAttr(Path path, String name) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeXAttr");
+ }
+
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index c4b5683ce..ffc109b20 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -7,6 +7,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
@@ -23,10 +24,12 @@ public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
private FilerGrpcClient filerGrpcClient;
+ private FilerClient filerClient;
public SeaweedFileSystemStore(String host, int port) {
int grpcPort = 10000 + port;
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
+ filerClient = new FilerClient(filerGrpcClient);
}
public static String getParentDirectory(Path path) {
@@ -49,23 +52,12 @@ public class SeaweedFileSystemStore {
permission,
umask);
- long now = System.currentTimeMillis() / 1000L;
-
- FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(getParentDirectory(path))
- .setEntry(FilerProto.Entry.newBuilder()
- .setName(path.getName())
- .setIsDirectory(true)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setMtime(now)
- .setCrtime(now)
- .setFileMode(permissionToMode(permission, true))
- .setUserName(currentUser.getUserName())
- .addAllGroupName(Arrays.asList(currentUser.getGroupNames())))
- );
-
- FilerProto.CreateEntryResponse response = filerGrpcClient.getBlockingStub().createEntry(request.build());
- return true;
+ return filerClient.mkdirs(
+ path.toUri().getPath(),
+ permissionToMode(permission, true),
+ currentUser.getUserName(),
+ currentUser.getGroupNames()
+ );
}
public FileStatus[] listEntries(final Path path) {
@@ -73,7 +65,7 @@ public class SeaweedFileSystemStore {
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
- List<FilerProto.Entry> entries = lookupEntries(path);
+ List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
for (FilerProto.Entry entry : entries) {
@@ -84,16 +76,6 @@ public class SeaweedFileSystemStore {
return fileStatuses.toArray(new FileStatus[0]);
}
- private List<FilerProto.Entry> lookupEntries(Path path) {
-
- LOG.debug("listEntries path: {}", path);
-
- return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
- .setDirectory(path.toUri().getPath())
- .setLimit(100000)
- .build()).getEntriesList();
- }
-
public FileStatus getFileStatus(final Path path) {
FilerProto.Entry entry = lookupEntry(path);
@@ -106,32 +88,24 @@ public class SeaweedFileSystemStore {
return fileStatus;
}
- public boolean deleteEntries(final Path path, boolean isDirectroy, boolean recursive) {
+ public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
path,
- String.valueOf(isDirectroy),
+ String.valueOf(isDirectory),
String.valueOf(recursive));
if (path.isRoot()) {
return true;
}
- if (recursive && isDirectroy) {
- List<FilerProto.Entry> entries = lookupEntries(path);
+ if (recursive && isDirectory) {
+ List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
for (FilerProto.Entry entry : entries) {
- deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), recursive);
+ deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true);
}
}
- FilerProto.DeleteEntryResponse response =
- filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
- .setDirectory(getParentDirectory(path))
- .setName(path.getName())
- .setIsDirectory(isDirectroy)
- .setIsDeleteData(true)
- .build());
-
- return true;
+ return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive);
}
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
@@ -151,18 +125,8 @@ public class SeaweedFileSystemStore {
private FilerProto.Entry lookupEntry(Path path) {
- String directory = getParentDirectory(path);
+ return filerClient.lookupEntry(getParentDirectory(path), path.getName());
- try {
- FilerProto.LookupDirectoryEntryResponse response =
- filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
- .setDirectory(directory)
- .setName(path.getName())
- .build());
- return response.getEntry();
- } catch (io.grpc.StatusRuntimeException e) {
- return null;
- }
}
public void rename(Path source, Path destination) {
@@ -186,9 +150,16 @@ public class SeaweedFileSystemStore {
LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination);
+ FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
+ boolean isDirectoryCreated = filerClient.createEntry(getParentDirectory(destination), newEntry.build());
+
+ if (!isDirectoryCreated) {
+ return false;
+ }
+
if (entry.getIsDirectory()) {
Path entryPath = new Path(oldParent, entry.getName());
- List<FilerProto.Entry> entries = lookupEntries(entryPath);
+ List<FilerProto.Entry> entries = filerClient.listEntries(entryPath.toUri().getPath());
for (FilerProto.Entry ent : entries) {
boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName()));
if (!isSucess) {
@@ -197,20 +168,9 @@ public class SeaweedFileSystemStore {
}
}
- FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
- filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(getParentDirectory(destination))
- .setEntry(newEntry)
- .build());
-
- filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
- .setDirectory(oldParent.toUri().getPath())
- .setName(entry.getName())
- .setIsDirectory(entry.getIsDirectory())
- .setIsDeleteData(false)
- .build());
-
- return true;
+ return filerClient.deleteEntry(
+ oldParent.toUri().getPath(), entry.getName(), false, false);
+
}
public OutputStream createFile(final Path path,
@@ -253,6 +213,7 @@ public class SeaweedFileSystemStore {
.setCrtime(now)
.setMtime(now)
.setUserName(userGroupInformation.getUserName())
+ .clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
}
@@ -306,10 +267,7 @@ public class SeaweedFileSystemStore {
LOG.debug("setOwner path:{} entry:{}", path, entryBuilder);
- filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
- .setDirectory(getParentDirectory(path))
- .setEntry(entryBuilder)
- .build());
+ filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
}
@@ -332,10 +290,7 @@ public class SeaweedFileSystemStore {
LOG.debug("setPermission path:{} entry:{}", path, entryBuilder);
- filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
- .setDirectory(getParentDirectory(path))
- .setEntry(entryBuilder)
- .build());
+ filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
}
}