aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-11-25 13:43:26 -0800
committerChris Lu <chris.lu@gmail.com>2018-11-25 13:43:26 -0800
commit1cbd53c01c5d4c35c1650b170346b48d32a4bcf7 (patch)
tree6ad96967204e84355a81f59f78bdc1e9a6199111 /other/java
parentd9871e92d2a7afe21804e62f211b468a72a216f4 (diff)
downloadseaweedfs-1cbd53c01c5d4c35c1650b170346b48d32a4bcf7.tar.xz
seaweedfs-1cbd53c01c5d4c35c1650b170346b48d32a4bcf7.zip
WIP SeaweedFileSystem added mkdirs, getFileStatus, listStatus, delete
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml70
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java45
-rw-r--r--other/java/client/src/main/proto/filer.proto178
-rw-r--r--other/java/hdfs/pom.xml9
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java51
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java120
6 files changed, 466 insertions, 7 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
new file mode 100644
index 000000000..07015145e
--- /dev/null
+++ b/other/java/client/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>seaweedfs</groupId>
+ <artifactId>client</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <protobuf.version>3.5.1</protobuf.version>
+ <grpc.version>1.16.1</grpc.version>
+ </properties>
+
+ <dependencies>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.5.0.Final</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}-1:exe:${os.detected.classifier}</protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
new file mode 100644
index 000000000..47712bc37
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -0,0 +1,45 @@
+package seaweedfs.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+public class FilerGrpcClient {
+
+ private static final Logger logger = Logger.getLogger(FilerGrpcClient.class.getName());
+
+ private final ManagedChannel channel;
+ private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
+ private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
+ private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
+
+
+ public FilerGrpcClient(String host, int port) {
+ this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
+ }
+
+ public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
+ channel = channelBuilder.build();
+ blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
+ asyncStub = SeaweedFilerGrpc.newStub(channel);
+ futureStub = SeaweedFilerGrpc.newFutureStub(channel);
+ }
+
+ public void shutdown() throws InterruptedException {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() {
+ return blockingStub;
+ }
+
+ public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() {
+ return asyncStub;
+ }
+
+ public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
+ return futureStub;
+ }
+}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
new file mode 100644
index 000000000..124eabcd2
--- /dev/null
+++ b/other/java/client/src/main/proto/filer.proto
@@ -0,0 +1,178 @@
+syntax = "proto3";
+
+package filer_pb;
+
+option java_package = "seaweedfs.client";
+option java_outer_classname = "FilerProto";
+
+//////////////////////////////////////////////////
+
+service SeaweedFiler {
+
+ rpc LookupDirectoryEntry (LookupDirectoryEntryRequest) returns (LookupDirectoryEntryResponse) {
+ }
+
+ rpc ListEntries (ListEntriesRequest) returns (ListEntriesResponse) {
+ }
+
+ rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) {
+ }
+
+ rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) {
+ }
+
+ rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
+ }
+
+ rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) {
+ }
+
+ rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
+ }
+
+ rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
+ }
+
+ rpc Statistics (StatisticsRequest) returns (StatisticsResponse) {
+ }
+
+}
+
+//////////////////////////////////////////////////
+
+message LookupDirectoryEntryRequest {
+ string directory = 1;
+ string name = 2;
+}
+
+message LookupDirectoryEntryResponse {
+ Entry entry = 1;
+}
+
+message ListEntriesRequest {
+ string directory = 1;
+ string prefix = 2;
+ string startFromFileName = 3;
+ bool inclusiveStartFrom = 4;
+ uint32 limit = 5;
+}
+
+message ListEntriesResponse {
+ repeated Entry entries = 1;
+}
+
+message Entry {
+ string name = 1;
+ bool is_directory = 2;
+ repeated FileChunk chunks = 3;
+ FuseAttributes attributes = 4;
+ map<string, bytes> extended = 5;
+}
+
+message EventNotification {
+ Entry old_entry = 1;
+ Entry new_entry = 2;
+ bool delete_chunks = 3;
+}
+
+message FileChunk {
+ string file_id = 1;
+ int64 offset = 2;
+ uint64 size = 3;
+ int64 mtime = 4;
+ string e_tag = 5;
+ string source_file_id = 6;
+}
+
+message FuseAttributes {
+ uint64 file_size = 1;
+ int64 mtime = 2; // unix time in seconds
+ uint32 file_mode = 3;
+ uint32 uid = 4;
+ uint32 gid = 5;
+ int64 crtime = 6; // unix time in seconds
+ string mime = 7;
+ string replication = 8;
+ string collection = 9;
+ int32 ttl_sec = 10;
+ string user_name = 11; // for hdfs
+ repeated string group_name = 12; // for hdfs
+}
+
+message CreateEntryRequest {
+ string directory = 1;
+ Entry entry = 2;
+}
+
+message CreateEntryResponse {
+}
+
+message UpdateEntryRequest {
+ string directory = 1;
+ Entry entry = 2;
+}
+message UpdateEntryResponse {
+}
+
+message DeleteEntryRequest {
+ string directory = 1;
+ string name = 2;
+ bool is_directory = 3;
+ bool is_delete_data = 4;
+ bool is_recursive = 5;
+}
+
+message DeleteEntryResponse {
+}
+
+message AssignVolumeRequest {
+ int32 count = 1;
+ string collection = 2;
+ string replication = 3;
+ int32 ttl_sec = 4;
+ string data_center = 5;
+}
+
+message AssignVolumeResponse {
+ string file_id = 1;
+ string url = 2;
+ string public_url = 3;
+ int32 count = 4;
+}
+
+message LookupVolumeRequest {
+ repeated string volume_ids = 1;
+}
+
+message Locations {
+ repeated Location locations = 1;
+}
+
+message Location {
+ string url = 1;
+ string public_url = 2;
+}
+message LookupVolumeResponse {
+ map<string, Locations> locations_map = 1;
+}
+
+message DeleteCollectionRequest {
+ string collection = 1;
+}
+
+message DeleteCollectionResponse {
+}
+
+message StatisticsRequest {
+ string replication = 1;
+ string collection = 2;
+ string ttl = 3;
+}
+message StatisticsResponse {
+ string replication = 1;
+ string collection = 2;
+ string ttl = 3;
+ uint64 total_size = 4;
+ uint64 used_size = 5;
+ uint64 file_count = 6;
+}
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
index 0892c85e8..1fb554a19 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs/pom.xml
@@ -4,8 +4,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>seaweed.hadoop</groupId>
- <artifactId>seaweedfs</artifactId>
+ <groupId>seaweedfs</groupId>
+ <artifactId>hadoop-client</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
@@ -18,6 +18,11 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>seaweedfs</groupId>
+ <artifactId>client</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 9b1a842c8..217f4a10e 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -3,9 +3,11 @@ package seaweed.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import java.io.FileNotFoundException;
@@ -20,6 +22,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
private URI uri;
private Path workingDirectory = new Path("/");
+ private SeaweedFileSystemStore seaweedFileSystemStore;
public URI getUri() {
return uri;
@@ -48,6 +51,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
setConf(conf);
this.uri = uri;
+
+ seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
}
public FSDataInputStream open(Path path, int i) throws IOException {
@@ -66,12 +71,24 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
return false;
}
- public boolean delete(Path path, boolean b) throws IOException {
- return false;
+ public boolean delete(Path path, boolean recursive) throws IOException {
+
+ path = qualify(path);
+
+ FileStatus fileStatus = getFileStatus(path);
+ if (fileStatus == null) {
+ return true;
+ }
+
+ return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
+
}
public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
- return new FileStatus[0];
+
+ path = qualify(path);
+
+ return seaweedFileSystemStore.listEntries(path);
}
public Path getWorkingDirectory() {
@@ -87,10 +104,34 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
- return false;
+
+ path = qualify(path);
+
+ try {
+ FileStatus fileStatus = getFileStatus(path);
+
+ if (fileStatus.isDirectory()) {
+ return true;
+ } else {
+ throw new FileAlreadyExistsException("Path is a file: " + path);
+ }
+ } catch (FileNotFoundException e) {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ return seaweedFileSystemStore.createDirectory(path, currentUser,
+ fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
+ FsPermission.getUMask(getConf()));
+ }
}
public FileStatus getFileStatus(Path path) throws IOException {
- return null;
+
+ path = qualify(path);
+
+ return seaweedFileSystemStore.getFileStatus(path);
+ }
+
+ Path qualify(Path path) {
+ return path.makeQualified(uri, workingDirectory);
}
+
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
new file mode 100644
index 000000000..085d5d217
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -0,0 +1,120 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class SeaweedFileSystemStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
+
+ private FilerGrpcClient filerGrpcClient;
+
+ public SeaweedFileSystemStore(String host, int port) {
+ filerGrpcClient = new FilerGrpcClient(host, port);
+ }
+
+ public boolean createDirectory(final Path path, UserGroupInformation currentUser,
+ final FsPermission permission, final FsPermission umask) {
+
+ LOG.debug("createDirectory path: {} permission: {} umask: {}",
+ path,
+ permission,
+ umask);
+
+ long now = System.currentTimeMillis() / 1000L;
+
+ FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(path.getParent().toUri().getPath())
+ .setEntry(FilerProto.Entry.newBuilder()
+ .setName(path.getName())
+ .setIsDirectory(true)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setFileMode(permission.toShort())
+ .setUserName(currentUser.getUserName())
+ .addAllGroupName(Arrays.asList(currentUser.getGroupNames())))
+ );
+
+ FilerProto.CreateEntryResponse response = filerGrpcClient.getBlockingStub().createEntry(request.build());
+ return true;
+ }
+
+ public FileStatus[] listEntries(final Path path) {
+ LOG.debug("listEntries path: {}", path);
+
+ List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
+
+ FilerProto.ListEntriesResponse response =
+ filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ .setDirectory(path.toUri().getPath())
+ .setLimit(100000)
+ .build());
+
+ for (FilerProto.Entry entry : response.getEntriesList()) {
+
+ FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry);
+
+ fileStatuses.add(fileStatus);
+ }
+ return fileStatuses.toArray(new FileStatus[0]);
+ }
+
+ public FileStatus getFileStatus(final Path path) {
+ LOG.debug("getFileStatus path: {}", path);
+
+ FilerProto.LookupDirectoryEntryResponse response =
+ filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder()
+ .setDirectory(path.getParent().toUri().getPath())
+ .setName(path.getName())
+ .build());
+
+ FilerProto.Entry entry = response.getEntry();
+ FileStatus fileStatus = getFileStatus(path, entry);
+ return fileStatus;
+ }
+
+ public boolean deleteEntries(final Path path, boolean isDirectroy, boolean recursive) {
+ LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
+ path,
+ String.valueOf(isDirectroy),
+ String.valueOf(recursive));
+
+ FilerProto.DeleteEntryResponse response =
+ filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
+ .setDirectory(path.getParent().toUri().getPath())
+ .setName(path.getName())
+ .setIsDirectory(isDirectroy)
+ .setIsDeleteData(true)
+ .build());
+
+ return true;
+ }
+
+
+ private FileStatus getFileStatus(Path path, FilerProto.Entry entry) {
+ FilerProto.FuseAttributes attributes = entry.getAttributes();
+ long length = attributes.getFileSize();
+ boolean isDir = entry.getIsDirectory();
+ int block_replication = 1;
+ int blocksize = 512;
+ long modification_time = attributes.getMtime();
+ long access_time = 0;
+ FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
+ String owner = attributes.getUserName();
+ String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
+ return new FileStatus(length, isDir, block_replication, blocksize,
+ modification_time, access_time, permission, owner, group, null, path);
+ }
+
+}