diff options
Diffstat (limited to 'other/java/hdfs/src')
| -rw-r--r-- | other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 51 | ||||
| -rw-r--r-- | other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java | 120 |
2 files changed, 166 insertions, 5 deletions
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 9b1a842c8..217f4a10e 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -3,9 +3,11 @@ package seaweed.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import java.io.FileNotFoundException; @@ -20,6 +22,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { private URI uri; private Path workingDirectory = new Path("/"); + private SeaweedFileSystemStore seaweedFileSystemStore; public URI getUri() { return uri; @@ -48,6 +51,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { setConf(conf); this.uri = uri; + + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); } public FSDataInputStream open(Path path, int i) throws IOException { @@ -66,12 +71,24 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { return false; } - public boolean delete(Path path, boolean b) throws IOException { - return false; + public boolean delete(Path path, boolean recursive) throws IOException { + + path = qualify(path); + + FileStatus fileStatus = getFileStatus(path); + if (fileStatus == null) { + return true; + } + + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); + } public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { - return new FileStatus[0]; + + path = qualify(path); + + return seaweedFileSystemStore.listEntries(path); } public Path getWorkingDirectory() { @@ -87,10 +104,34 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { - return false; + + path = qualify(path); + + try { + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); + } + } catch (FileNotFoundException e) { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + return seaweedFileSystemStore.createDirectory(path, currentUser, + fsPermission == null ? FsPermission.getDirDefault() : fsPermission, + FsPermission.getUMask(getConf())); + } } public FileStatus getFileStatus(Path path) throws IOException { - return null; + + path = qualify(path); + + return seaweedFileSystemStore.getFileStatus(path); + } + + Path qualify(Path path) { + return path.makeQualified(uri, workingDirectory); } + } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java new file mode 100644 index 000000000..085d5d217 --- /dev/null +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -0,0 +1,120 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class SeaweedFileSystemStore { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); + + private FilerGrpcClient filerGrpcClient; + + public SeaweedFileSystemStore(String host, int port) { + filerGrpcClient = new FilerGrpcClient(host, port); + } + + public boolean createDirectory(final Path path, UserGroupInformation currentUser, + final FsPermission permission, final FsPermission umask) { + + LOG.debug("createDirectory path: {} permission: {} umask: {}", + path, + permission, + umask); + + long now = System.currentTimeMillis() / 1000L; + + FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setEntry(FilerProto.Entry.newBuilder() + .setName(path.getName()) + .setIsDirectory(true) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setFileMode(permission.toShort()) + .setUserName(currentUser.getUserName()) + .addAllGroupName(Arrays.asList(currentUser.getGroupNames()))) + ); + + FilerProto.CreateEntryResponse response = filerGrpcClient.getBlockingStub().createEntry(request.build()); + return true; + } + + public FileStatus[] listEntries(final Path path) { + LOG.debug("listEntries path: {}", path); + + List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); + + FilerProto.ListEntriesResponse response = + filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + .setDirectory(path.toUri().getPath()) + .setLimit(100000) + .build()); + + for (FilerProto.Entry entry : response.getEntriesList()) { + + FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry); + + fileStatuses.add(fileStatus); + } + return fileStatuses.toArray(new FileStatus[0]); + } + + public FileStatus getFileStatus(final Path path) { + LOG.debug("getFileStatus path: {}", path); + + FilerProto.LookupDirectoryEntryResponse response = + filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setName(path.getName()) + .build()); + + FilerProto.Entry entry = response.getEntry(); + FileStatus fileStatus = getFileStatus(path, entry); + return fileStatus; + } + + public boolean deleteEntries(final Path path, boolean isDirectroy, boolean recursive) { + LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", + path, + String.valueOf(isDirectroy), + String.valueOf(recursive)); + + FilerProto.DeleteEntryResponse response = + filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setName(path.getName()) + .setIsDirectory(isDirectroy) + .setIsDeleteData(true) + .build()); + + return true; + } + + + private FileStatus getFileStatus(Path path, FilerProto.Entry entry) { + FilerProto.FuseAttributes attributes = entry.getAttributes(); + long length = attributes.getFileSize(); + boolean isDir = entry.getIsDirectory(); + int block_replication = 1; + int blocksize = 512; + long modification_time = attributes.getMtime(); + long access_time = 0; + FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); + String owner = attributes.getUserName(); + String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : ""; + return new FileStatus(length, isDir, block_replication, blocksize, + modification_time, access_time, permission, owner, group, null, path); + } + +} |
