aboutsummaryrefslogtreecommitdiff
path: root/other/java/hdfs
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/hdfs')
-rw-r--r--other/java/hdfs/pom.xml9
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java51
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java120
3 files changed, 173 insertions, 7 deletions
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
index 0892c85e8..1fb554a19 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs/pom.xml
@@ -4,8 +4,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>seaweed.hadoop</groupId>
- <artifactId>seaweedfs</artifactId>
+ <groupId>seaweedfs</groupId>
+ <artifactId>hadoop-client</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
@@ -18,6 +18,11 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>seaweedfs</groupId>
+ <artifactId>client</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 9b1a842c8..217f4a10e 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -3,9 +3,11 @@ package seaweed.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import java.io.FileNotFoundException;
@@ -20,6 +22,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
private URI uri;
private Path workingDirectory = new Path("/");
+ private SeaweedFileSystemStore seaweedFileSystemStore;
public URI getUri() {
return uri;
@@ -48,6 +51,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
setConf(conf);
this.uri = uri;
+
+ seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
}
public FSDataInputStream open(Path path, int i) throws IOException {
@@ -66,12 +71,24 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
return false;
}
- public boolean delete(Path path, boolean b) throws IOException {
- return false;
+ public boolean delete(Path path, boolean recursive) throws IOException {
+
+ path = qualify(path);
+
+ FileStatus fileStatus = getFileStatus(path);
+ if (fileStatus == null) {
+ return true;
+ }
+
+ return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
+
}
public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
- return new FileStatus[0];
+
+ path = qualify(path);
+
+ return seaweedFileSystemStore.listEntries(path);
}
public Path getWorkingDirectory() {
@@ -87,10 +104,34 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
- return false;
+
+ path = qualify(path);
+
+ try {
+ FileStatus fileStatus = getFileStatus(path);
+
+ if (fileStatus.isDirectory()) {
+ return true;
+ } else {
+ throw new FileAlreadyExistsException("Path is a file: " + path);
+ }
+ } catch (FileNotFoundException e) {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ return seaweedFileSystemStore.createDirectory(path, currentUser,
+ fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
+ FsPermission.getUMask(getConf()));
+ }
}
public FileStatus getFileStatus(Path path) throws IOException {
- return null;
+
+ path = qualify(path);
+
+ return seaweedFileSystemStore.getFileStatus(path);
+ }
+
+ Path qualify(Path path) {
+ return path.makeQualified(uri, workingDirectory);
}
+
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
new file mode 100644
index 000000000..085d5d217
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -0,0 +1,120 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class SeaweedFileSystemStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
+
+ private FilerGrpcClient filerGrpcClient;
+
+ public SeaweedFileSystemStore(String host, int port) {
+ filerGrpcClient = new FilerGrpcClient(host, port);
+ }
+
+ public boolean createDirectory(final Path path, UserGroupInformation currentUser,
+ final FsPermission permission, final FsPermission umask) {
+
+ LOG.debug("createDirectory path: {} permission: {} umask: {}",
+ path,
+ permission,
+ umask);
+
+ long now = System.currentTimeMillis() / 1000L;
+
+ FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(path.getParent().toUri().getPath())
+ .setEntry(FilerProto.Entry.newBuilder()
+ .setName(path.getName())
+ .setIsDirectory(true)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setFileMode(permission.toShort())
+ .setUserName(currentUser.getUserName())
+ .addAllGroupName(Arrays.asList(currentUser.getGroupNames())))
+ );
+
+ FilerProto.CreateEntryResponse response = filerGrpcClient.getBlockingStub().createEntry(request.build());
+ return true;
+ }
+
+ public FileStatus[] listEntries(final Path path) {
+ LOG.debug("listEntries path: {}", path);
+
+ List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
+
+ FilerProto.ListEntriesResponse response =
+ filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ .setDirectory(path.toUri().getPath())
+ .setLimit(100000)
+ .build());
+
+ for (FilerProto.Entry entry : response.getEntriesList()) {
+
+ FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry);
+
+ fileStatuses.add(fileStatus);
+ }
+ return fileStatuses.toArray(new FileStatus[0]);
+ }
+
+ public FileStatus getFileStatus(final Path path) {
+ LOG.debug("getFileStatus path: {}", path);
+
+ FilerProto.LookupDirectoryEntryResponse response =
+ filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
+ .setDirectory(path.getParent().toUri().getPath())
+ .setName(path.getName())
+ .build());
+
+ FilerProto.Entry entry = response.getEntry();
+ FileStatus fileStatus = getFileStatus(path, entry);
+ return fileStatus;
+ }
+
+ public boolean deleteEntries(final Path path, boolean isDirectroy, boolean recursive) {
+ LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
+ path,
+ String.valueOf(isDirectroy),
+ String.valueOf(recursive));
+
+ FilerProto.DeleteEntryResponse response =
+ filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
+ .setDirectory(path.getParent().toUri().getPath())
+ .setName(path.getName())
+ .setIsDirectory(isDirectroy)
+ .setIsDeleteData(true)
+ .build());
+
+ return true;
+ }
+
+
+ private FileStatus getFileStatus(Path path, FilerProto.Entry entry) {
+ FilerProto.FuseAttributes attributes = entry.getAttributes();
+ long length = attributes.getFileSize();
+ boolean isDir = entry.getIsDirectory();
+ int block_replication = 1;
+ int blocksize = 512;
+ long modification_time = attributes.getMtime();
+ long access_time = 0;
+ FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
+ String owner = attributes.getUserName();
+ String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
+ return new FileStatus(length, isDir, block_replication, blocksize,
+ modification_time, access_time, permission, owner, group, null, path);
+ }
+
+}