diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-11-25 13:43:26 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-11-25 13:43:26 -0800 |
| commit | 1cbd53c01c5d4c35c1650b170346b48d32a4bcf7 (patch) | |
| tree | 6ad96967204e84355a81f59f78bdc1e9a6199111 /other/java | |
| parent | d9871e92d2a7afe21804e62f211b468a72a216f4 (diff) | |
| download | seaweedfs-1cbd53c01c5d4c35c1650b170346b48d32a4bcf7.tar.xz seaweedfs-1cbd53c01c5d4c35c1650b170346b48d32a4bcf7.zip | |
WIP SeaweedFileSystem added mkdirs, getFileStatus, listStatus, delete
Diffstat (limited to 'other/java')
| -rw-r--r-- | other/java/client/pom.xml | 70 | ||||
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java | 45 | ||||
| -rw-r--r-- | other/java/client/src/main/proto/filer.proto | 178 | ||||
| -rw-r--r-- | other/java/hdfs/pom.xml | 9 | ||||
| -rw-r--r-- | other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 51 | ||||
| -rw-r--r-- | other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java | 120 |
6 files changed, 466 insertions, 7 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml new file mode 100644 index 000000000..07015145e --- /dev/null +++ b/other/java/client/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>seaweedfs</groupId> + <artifactId>client</artifactId> + <version>1.0-SNAPSHOT</version> + + <properties> + <protobuf.version>3.5.1</protobuf.version> + <grpc.version>1.16.1</grpc.version> + </properties> + + <dependencies> + <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + </dependencies> + + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.5.0.Final</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}-1:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java new file mode 100644 index 000000000..47712bc37 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -0,0 +1,45 @@ +package seaweedfs.client; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class FilerGrpcClient { + + private static final Logger logger = Logger.getLogger(FilerGrpcClient.class.getName()); + + private final ManagedChannel channel; + private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; + private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; + private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + + + public FilerGrpcClient(String host, int port) { + this(ManagedChannelBuilder.forAddress(host, port).usePlaintext()); + } + + public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { + channel = channelBuilder.build(); + blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); + asyncStub = SeaweedFilerGrpc.newStub(channel); + futureStub = SeaweedFilerGrpc.newFutureStub(channel); + } + + public void shutdown() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + + public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() { + return blockingStub; + } + + public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() { + return asyncStub; + } + + public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { + return futureStub; + } +} diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto new file mode 100644 index 000000000..124eabcd2 --- /dev/null +++ b/other/java/client/src/main/proto/filer.proto @@ -0,0 +1,178 @@ +syntax = "proto3"; + +package filer_pb; + +option java_package = "seaweedfs.client"; +option java_outer_classname = "FilerProto"; + +////////////////////////////////////////////////// + +service SeaweedFiler { + + rpc LookupDirectoryEntry (LookupDirectoryEntryRequest) returns (LookupDirectoryEntryResponse) { + } + + rpc ListEntries (ListEntriesRequest) returns (ListEntriesResponse) { + } + + rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) { + } + + rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) { + } + + rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { + } + + rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) { + } + + rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) { + } + + rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) { + } + + rpc Statistics (StatisticsRequest) returns (StatisticsResponse) { + } + +} + +////////////////////////////////////////////////// + +message LookupDirectoryEntryRequest { + string directory = 1; + string name = 2; +} + +message LookupDirectoryEntryResponse { + Entry entry = 1; +} + +message ListEntriesRequest { + string directory = 1; + string prefix = 2; + string startFromFileName = 3; + bool inclusiveStartFrom = 4; + uint32 limit = 5; +} + +message ListEntriesResponse { + repeated Entry entries = 1; +} + +message Entry { + string name = 1; + bool is_directory = 2; + repeated FileChunk chunks = 3; + FuseAttributes attributes = 4; + map<string, bytes> extended = 5; +} + +message EventNotification { + Entry old_entry = 1; + Entry new_entry = 2; + bool delete_chunks = 3; +} + +message FileChunk { + string file_id = 1; + int64 offset = 2; + uint64 size = 3; + int64 mtime = 4; + string e_tag = 5; + string source_file_id = 6; +} + +message FuseAttributes { + uint64 file_size = 1; + int64 mtime = 2; // unix time in seconds + uint32 file_mode = 3; + uint32 uid = 4; + uint32 gid = 5; + int64 crtime = 6; // unix time in seconds + string mime = 7; + string replication = 8; + string collection = 9; + int32 ttl_sec = 10; + string user_name = 11; // for hdfs + repeated string group_name = 12; // for hdfs +} + +message CreateEntryRequest { + string directory = 1; + Entry entry = 2; +} + +message CreateEntryResponse { +} + +message UpdateEntryRequest { + string directory = 1; + Entry entry = 2; +} +message UpdateEntryResponse { +} + +message DeleteEntryRequest { + string directory = 1; + string name = 2; + bool is_directory = 3; + bool is_delete_data = 4; + bool is_recursive = 5; +} + +message DeleteEntryResponse { +} + +message AssignVolumeRequest { + int32 count = 1; + string collection = 2; + string replication = 3; + int32 ttl_sec = 4; + string data_center = 5; +} + +message AssignVolumeResponse { + string file_id = 1; + string url = 2; + string public_url = 3; + int32 count = 4; +} + +message LookupVolumeRequest { + repeated string volume_ids = 1; +} + +message Locations { + repeated Location locations = 1; +} + +message Location { + string url = 1; + string public_url = 2; +} +message LookupVolumeResponse { + map<string, Locations> locations_map = 1; +} + +message DeleteCollectionRequest { + string collection = 1; +} + +message DeleteCollectionResponse { +} + +message StatisticsRequest { + string replication = 1; + string collection = 2; + string ttl = 3; +} +message StatisticsResponse { + string replication = 1; + string collection = 2; + string ttl = 3; + uint64 total_size = 4; + uint64 used_size = 5; + uint64 file_count = 6; +} diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml index 0892c85e8..1fb554a19 100644 --- a/other/java/hdfs/pom.xml +++ b/other/java/hdfs/pom.xml @@ -4,8 +4,8 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>seaweed.hadoop</groupId> - <artifactId>seaweedfs</artifactId> + <groupId>seaweedfs</groupId> + <artifactId>hadoop-client</artifactId> <version>1.0-SNAPSHOT</version> <properties> @@ -18,6 +18,11 @@ <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> + <dependency> + <groupId>seaweedfs</groupId> + <artifactId>client</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> </dependencies> </project> diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 9b1a842c8..217f4a10e 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -3,9 +3,11 @@ package seaweed.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import java.io.FileNotFoundException; @@ -20,6 +22,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { private URI uri; private Path workingDirectory = new Path("/"); + private SeaweedFileSystemStore seaweedFileSystemStore; public URI getUri() { return uri; @@ -48,6 +51,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { setConf(conf); this.uri = uri; + + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); } public FSDataInputStream open(Path path, int i) throws IOException { @@ -66,12 +71,24 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { return false; } - public boolean delete(Path path, boolean b) throws IOException { - return false; + public boolean delete(Path path, boolean recursive) throws IOException { + + path = qualify(path); + + FileStatus fileStatus = getFileStatus(path); + if (fileStatus == null) { + return true; + } + + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); + } public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { - return new FileStatus[0]; + + path = qualify(path); + + return seaweedFileSystemStore.listEntries(path); } public Path getWorkingDirectory() { @@ -87,10 +104,34 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { - return false; + + path = qualify(path); + + try { + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); + } + } catch (FileNotFoundException e) { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + return seaweedFileSystemStore.createDirectory(path, currentUser, + fsPermission == null ? FsPermission.getDirDefault() : fsPermission, + FsPermission.getUMask(getConf())); + } } public FileStatus getFileStatus(Path path) throws IOException { - return null; + + path = qualify(path); + + return seaweedFileSystemStore.getFileStatus(path); + } + + Path qualify(Path path) { + return path.makeQualified(uri, workingDirectory); } + } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java new file mode 100644 index 000000000..085d5d217 --- /dev/null +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -0,0 +1,120 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class SeaweedFileSystemStore { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); + + private FilerGrpcClient filerGrpcClient; + + public SeaweedFileSystemStore(String host, int port) { + filerGrpcClient = new FilerGrpcClient(host, port); + } + + public boolean createDirectory(final Path path, UserGroupInformation currentUser, + final FsPermission permission, final FsPermission umask) { + + LOG.debug("createDirectory path: {} permission: {} umask: {}", + path, + permission, + umask); + + long now = System.currentTimeMillis() / 1000L; + + FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setEntry(FilerProto.Entry.newBuilder() + .setName(path.getName()) + .setIsDirectory(true) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setFileMode(permission.toShort()) + .setUserName(currentUser.getUserName()) + .addAllGroupName(Arrays.asList(currentUser.getGroupNames()))) + ); + + FilerProto.CreateEntryResponse response = filerGrpcClient.getBlockingStub().createEntry(request.build()); + return true; + } + + public FileStatus[] listEntries(final Path path) { + LOG.debug("listEntries path: {}", path); + + List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); + + FilerProto.ListEntriesResponse response = + filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + .setDirectory(path.toUri().getPath()) + .setLimit(100000) + .build()); + + for (FilerProto.Entry entry : response.getEntriesList()) { + + FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry); + + fileStatuses.add(fileStatus); + } + return fileStatuses.toArray(new FileStatus[0]); + } + + public FileStatus getFileStatus(final Path path) { + LOG.debug("getFileStatus path: {}", path); + + FilerProto.LookupDirectoryEntryResponse response = + filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setName(path.getName()) + .build()); + + FilerProto.Entry entry = response.getEntry(); + FileStatus fileStatus = getFileStatus(path, entry); + return fileStatus; + } + + public boolean deleteEntries(final Path path, boolean isDirectroy, boolean recursive) { + LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", + path, + String.valueOf(isDirectroy), + String.valueOf(recursive)); + + FilerProto.DeleteEntryResponse response = + filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setName(path.getName()) + .setIsDirectory(isDirectroy) + .setIsDeleteData(true) + .build()); + + return true; + } + + + private FileStatus getFileStatus(Path path, FilerProto.Entry entry) { + FilerProto.FuseAttributes attributes = entry.getAttributes(); + long length = attributes.getFileSize(); + boolean isDir = entry.getIsDirectory(); + int block_replication = 1; + int blocksize = 512; + long modification_time = attributes.getMtime(); + long access_time = 0; + FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); + String owner = attributes.getUserName(); + String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : ""; + return new FileStatus(length, isDir, block_replication, blocksize, + modification_time, access_time, permission, owner, group, null, path); + } + +} |
