aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml24
-rw-r--r--other/java/client/pom.xml.deploy170
-rw-r--r--other/java/client/pom_debug.xml144
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java42
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java36
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java140
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java170
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java84
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/Gzip.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java55
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java208
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java (renamed from other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java)181
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java300
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java30
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java121
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java36
-rw-r--r--other/java/client/src/main/proto/filer.proto152
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java42
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java5
-rw-r--r--other/java/examples/pom.xml32
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java48
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java46
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java47
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java25
-rw-r--r--other/java/hdfs-over-ftp/pom.xml120
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java14
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java27
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java71
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java98
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java71
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java43
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java102
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java333
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java14
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java104
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java72
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java239
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java453
-rw-r--r--other/java/hdfs-over-ftp/src/main/resources/application.yml15
-rw-r--r--other/java/hdfs-over-ftp/src/main/resources/assembly.xml39
-rw-r--r--other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml40
-rw-r--r--other/java/hdfs-over-ftp/users.properties12
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml180
-rw-r--r--other/java/hdfs2/pom.xml8
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java25
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java137
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java394
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java70
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java (renamed from other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java)22
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java72
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java75
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java150
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java16
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java371
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml188
-rw-r--r--other/java/hdfs3/pom.xml8
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java25
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java137
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java394
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java70
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java (renamed from other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java)22
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java70
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java75
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java150
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java64
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java371
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java335
-rw-r--r--other/java/s3copier/pom.xml2
68 files changed, 4804 insertions, 2673 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 0c585a941..f4e522a3e 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.4</version>
+ <version>1.6.4</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -16,7 +17,7 @@
<protobuf.version>3.9.1</protobuf.version>
<!-- follow https://github.com/grpc/grpc-java -->
<grpc.version>1.23.0</grpc.version>
- <guava.version>28.0-jre</guava.version>
+ <guava.version>30.0-jre</guava.version>
</properties>
<dependencies>
@@ -64,9 +65,14 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.12</version>
+ <version>4.13.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
</dependencies>
<distributionManagement>
@@ -88,8 +94,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -97,9 +103,11 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
- <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
<pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
</configuration>
<executions>
<execution>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
new file mode 100644
index 000000000..9c8c4f45e
--- /dev/null
+++ b/other/java/client/pom.xml.deploy
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.6.4</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
new file mode 100644
index 000000000..12ea860c2
--- /dev/null
+++ b/other/java/client/pom_debug.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.6.4</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
new file mode 100644
index 000000000..51053becd
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -0,0 +1,42 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ByteBufferPool {
+
+ private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024;
+ private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
+
+ private static final List<ByteBuffer> bufferList = new ArrayList<>();
+
+ public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ bufferSize = MIN_BUFFER_SIZE;
+ }
+ LOG.debug("requested new buffer {}", bufferSize);
+ if (bufferList.isEmpty()) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+ ByteBuffer buffer = bufferList.remove(bufferList.size() - 1);
+ if (buffer.capacity() >= bufferSize) {
+ return buffer;
+ }
+
+ LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize);
+ bufferList.add(0, buffer);
+ return ByteBuffer.allocate(bufferSize);
+
+ }
+
+ public static synchronized void release(ByteBuffer obj) {
+ ((Buffer)obj).clear();
+ bufferList.add(0, obj);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
new file mode 100644
index 000000000..58870d742
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -0,0 +1,36 @@
+package seaweedfs.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.concurrent.TimeUnit;
+
+public class ChunkCache {
+
+ private Cache<String, byte[]> cache = null;
+
+ public ChunkCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(maxEntries)
+ .expireAfterAccess(1, TimeUnit.HOURS)
+ .build();
+ }
+
+ public byte[] getChunk(String fileId) {
+ if (this.cache == null) {
+ return null;
+ }
+ return this.cache.getIfPresent(fileId);
+ }
+
+ public void setChunk(String fileId, byte[] data) {
+ if (this.cache == null) {
+ return;
+ }
+ this.cache.put(fileId, data);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
new file mode 100644
index 000000000..9b6ba5dfc
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -0,0 +1,140 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileChunkManifest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class);
+
+ private static final int mergeFactor = 1000;
+
+ public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) {
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (chunk.getIsChunkManifest()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<FilerProto.FileChunk> resolveChunkManifest(
+ final FilerClient filerClient, List<FilerProto.FileChunk> chunks) throws IOException {
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ continue;
+ }
+
+ // IsChunkManifest
+ LOG.debug("fetching chunk manifest:{}", chunk);
+ byte[] data = fetchChunk(filerClient, chunk);
+ FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
+ List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk t : m.getChunksList()) {
+ // avoid deprecated chunk.getFileId()
+ resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
+ }
+ dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks));
+ }
+
+ return dataChunks;
+ }
+
+ private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException {
+
+ String vid = "" + chunk.getFid().getVolumeId();
+ FilerProto.Locations locations = filerClient.vidLocations.get(vid);
+ if (locations == null) {
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ lookupRequest.addVolumeIds(vid);
+ FilerProto.LookupVolumeResponse lookupResponse = filerClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ locations = lookupResponse.getLocationsMapMap().get(vid);
+ filerClient.vidLocations.put(vid, locations);
+ LOG.debug("fetchChunk vid:{} locations:{}", vid, locations);
+ }
+
+ SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
+ FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
+ 0,
+ -1,
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed());
+
+ byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
+ if (chunkData == null) {
+ LOG.debug("doFetchFullChunkData:{}", chunkView);
+ chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations);
+ }
+ if (chunk.getIsChunkManifest()){
+ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
+ SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+ }
+
+ return chunkData;
+
+ }
+
+ public static List<FilerProto.FileChunk> maybeManifestize(
+ final FilerClient filerClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException {
+ // the return variable
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : inputChunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ } else {
+ chunks.add(chunk);
+ }
+ }
+
+ int remaining = dataChunks.size();
+ for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
+ FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory);
+ chunks.add(chunk);
+ remaining -= mergeFactor;
+ }
+
+ // remaining
+ for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) {
+ chunks.add(dataChunks.get(i));
+ }
+ return chunks;
+ }
+
+ private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException {
+ // create and serialize the manifest
+ dataChunks = FilerClient.beforeEntrySerialization(dataChunks);
+ FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
+ byte[] data = m.build().toByteArray();
+
+ long minOffset = Long.MAX_VALUE;
+ long maxOffset = -1;
+ for (FilerProto.FileChunk chunk : dataChunks) {
+ minOffset = Math.min(minOffset, chunk.getOffset());
+ maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset());
+ }
+
+ FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
+ filerClient.getReplication(),
+ filerClient,
+ minOffset,
+ data, 0, data.length, parentDirectory);
+ manifestChunk.setIsChunkManifest(true);
+ manifestChunk.setSize(maxOffset - minOffset);
+ return manifestChunk.build();
+
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 84aa26ad9..257a9873d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -1,27 +1,82 @@
package seaweedfs.client;
+import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-public class FilerClient {
+public class FilerClient extends FilerGrpcClient {
private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
- private FilerGrpcClient filerGrpcClient;
-
public FilerClient(String host, int grpcPort) {
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
+ super(host, grpcPort);
+ }
+
+ public static String toFileId(FilerProto.FileId fid) {
+ if (fid == null) {
+ return null;
+ }
+ return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ }
+
+ public static FilerProto.FileId toFileIdObject(String fileIdStr) {
+ if (fileIdStr == null || fileIdStr.length() == 0) {
+ return null;
+ }
+ int commaIndex = fileIdStr.lastIndexOf(',');
+ String volumeIdStr = fileIdStr.substring(0, commaIndex);
+ String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8);
+ String cookieStr = fileIdStr.substring(fileIdStr.length() - 8);
+
+ return FilerProto.FileId.newBuilder()
+ .setVolumeId(Integer.parseInt(volumeIdStr))
+ .setFileKey(Long.parseLong(fileKeyStr, 16))
+ .setCookie((int) Long.parseLong(cookieStr, 16))
+ .build();
+ }
+
+ public static List<FilerProto.FileChunk> beforeEntrySerialization(List<FilerProto.FileChunk> chunks) {
+ List<FilerProto.FileChunk> cleanedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : chunks) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.clearFileId();
+ chunkBuilder.clearSourceFileId();
+ chunkBuilder.setFid(toFileIdObject(chunk.getFileId()));
+ FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId());
+ if (sourceFid != null) {
+ chunkBuilder.setSourceFid(sourceFid);
+ }
+ cleanedChunks.add(chunkBuilder.build());
+ }
+ return cleanedChunks;
}
- public FilerClient(FilerGrpcClient filerGrpcClient) {
- this.filerGrpcClient = filerGrpcClient;
+ public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) {
+ if (entry.getChunksList().size() <= 0) {
+ return entry;
+ }
+ String fileId = entry.getChunks(0).getFileId();
+ if (fileId != null && fileId.length() != 0) {
+ return entry;
+ }
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ entryBuilder.clearChunks();
+ for (FilerProto.FileChunk chunk : entry.getChunksList()) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.setFileId(toFileId(chunk.getFid()));
+ String sourceFileId = toFileId(chunk.getSourceFid());
+ if (sourceFileId != null) {
+ chunkBuilder.setSourceFileId(sourceFileId);
+ }
+ entryBuilder.addChunks(chunkBuilder);
+ }
+ return entryBuilder.build();
}
public boolean mkdirs(String path, int mode) {
@@ -38,9 +93,9 @@ public class FilerClient {
if ("/".equals(path)) {
return true;
}
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent().replace('\\','/');
+ String name = pathFile.getName();
mkdirs(parent, mode, uid, gid, userName, groupNames);
@@ -59,13 +114,13 @@ public class FilerClient {
public boolean mv(String oldPath, String newPath) {
- Path oldPathObject = Paths.get(oldPath);
- String oldParent = oldPathObject.getParent().toString();
- String oldName = oldPathObject.getFileName().toString();
+ File oldPathFile = new File(oldPath);
+ String oldParent = oldPathFile.getParent().replace('\\','/');
+ String oldName = oldPathFile.getName();
- Path newPathObject = Paths.get(newPath);
- String newParent = newPathObject.getParent().toString();
- String newName = newPathObject.getFileName().toString();
+ File newPathFile = new File(newPath);
+ String newParent = newPathFile.getParent().replace('\\','/');
+ String newName = newPathFile.getName();
return atomicRenameEntry(oldParent, oldName, newParent, newName);
@@ -73,9 +128,9 @@ public class FilerClient {
public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) {
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent().replace('\\','/');
+ String name = pathFile.getName();
return deleteEntry(
parent,
@@ -92,9 +147,9 @@ public class FilerClient {
public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) {
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent().replace('\\','/');
+ String name = pathFile.getName();
FilerProto.Entry entry = lookupEntry(parent, name);
if (entry == null) {
@@ -156,7 +211,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) {
- List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024);
+ List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) {
break;
}
@@ -173,31 +228,35 @@ public class FilerClient {
return results;
}
- public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
- Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
+ Iterator<FilerProto.ListEntriesResponse> iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
+ .setInclusiveStartFrom(includeLastEntry)
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
- while (iter.hasNext()){
+ while (iter.hasNext()) {
FilerProto.ListEntriesResponse resp = iter.next();
- entries.add(fixEntryAfterReading(resp.getEntry()));
+ entries.add(afterEntryDeserialization(resp.getEntry()));
}
return entries;
}
public FilerProto.Entry lookupEntry(String directory, String entryName) {
try {
- FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
+ FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry(
FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(directory)
.setName(entryName)
.build()).getEntry();
- return fixEntryAfterReading(entry);
+ if (entry == null) {
+ return null;
+ }
+ return afterEntryDeserialization(entry);
} catch (Exception e) {
- if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){
+ if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
return null;
}
LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);
@@ -205,28 +264,32 @@ public class FilerClient {
}
}
-
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
- filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ FilerProto.CreateEntryResponse createEntryResponse =
+ this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
+ if (Strings.isNullOrEmpty(createEntryResponse.getError())) {
+ return true;
+ }
+ LOG.warn("createEntry {}/{} error: {}", parent, entry.getName(), createEntryResponse.getError());
+ return false;
} catch (Exception e) {
LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
return false;
}
- return true;
}
public boolean updateEntry(String parent, FilerProto.Entry entry) {
try {
- filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
+ this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
.setDirectory(parent)
.setEntry(entry)
.build());
} catch (Exception e) {
- LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
+ LOG.warn("updateEntry {}/{}: {}", parent, entry.getName(), e);
return false;
}
return true;
@@ -234,7 +297,7 @@ public class FilerClient {
public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) {
try {
- filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
+ this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(parent)
.setName(entryName)
.setIsDeleteData(isDeleteFileChunk)
@@ -250,7 +313,7 @@ public class FilerClient {
public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) {
try {
- filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
+ this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
.setOldDirectory(oldParent)
.setOldName(oldName)
.setNewDirectory(newParent)
@@ -263,24 +326,13 @@ public class FilerClient {
return true;
}
- private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) {
- if (entry.getChunksList().size() <= 0) {
- return entry;
- }
- String fileId = entry.getChunks(0).getFileId();
- if (fileId != null && fileId.length() != 0) {
- return entry;
- }
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- entryBuilder.clearChunks();
- for (FilerProto.FileChunk chunk : entry.getChunksList()) {
- FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
- FilerProto.FileId fid = chunk.getFid();
- fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
- chunkBuilder.setFileId(fileId);
- entryBuilder.addChunks(chunkBuilder);
- }
- return entryBuilder.build();
+ public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) {
+ return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
+ .setPathPrefix(prefix)
+ .setClientName(clientName)
+ .setSinceNs(sinceNs)
+ .build()
+ );
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 3626c76de..6c57e2e0d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -9,17 +9,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
-
- private final ManagedChannel channel;
- private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
- private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
- private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
-
static SslContext sslContext;
static {
@@ -30,6 +26,20 @@ public class FilerGrpcClient {
}
}
+ public final int VOLUME_SERVER_ACCESS_DIRECT = 0;
+ public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1;
+ public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
+ public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
+ private final ManagedChannel channel;
+ private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
+ private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
+ private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
+ private boolean cipher = false;
+ private String collection = "";
+ private String replication = "";
+ private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ private String filerAddress;
+
public FilerGrpcClient(String host, int grpcPort) {
this(host, grpcPort, sslContext);
}
@@ -37,20 +47,43 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
+ ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()
+ .maxInboundMessageSize(1024 * 1024 * 1024) :
NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
+ filerAddress = String.format("%s:%d", host, grpcPort - 10000);
+
+ FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
+ this.getBlockingStub().getFilerConfiguration(
+ FilerProto.GetFilerConfigurationRequest.newBuilder().build());
+ cipher = filerConfigurationResponse.getCipher();
+ collection = filerConfigurationResponse.getCollection();
+ replication = filerConfigurationResponse.getReplication();
+
}
- public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
+ private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
asyncStub = SeaweedFilerGrpc.newStub(channel);
futureStub = SeaweedFilerGrpc.newFutureStub(channel);
}
+ public boolean isCipher() {
+ return cipher;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public String getReplication() {
+ return replication;
+ }
+
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
@@ -67,4 +100,39 @@ public class FilerGrpcClient {
return futureStub;
}
+ public void setAccessVolumeServerDirectly() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ }
+
+ public boolean isAccessVolumeServerDirectly() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT;
+ }
+
+ public void setAccessVolumeServerByPublicUrl() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+
+ public boolean isAccessVolumeServerByPublicUrl() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+
+ public void setAccessVolumeServerByFilerProxy() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+
+ public boolean isAccessVolumeServerByFilerProxy() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+
+ public String getChunkUrl(String chunkId, String url, String publicUrl) {
+ switch (this.volumeServerAccess) {
+ case VOLUME_SERVER_ACCESS_PUBLIC_URL:
+ return String.format("http://%s/%s", publicUrl, chunkId);
+ case VOLUME_SERVER_ACCESS_FILER_PROXY:
+ return String.format("http://%s/?proxyChunkId=%s", this.filerAddress, chunkId);
+ default:
+ return String.format("http://%s/%s", url, chunkId);
+ }
+ }
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
new file mode 100644
index 000000000..4909094f5
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
@@ -0,0 +1,41 @@
+package seaweedfs.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class Gzip {
+ public static byte[] compress(byte[] data) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
+ GZIPOutputStream gzip = new GZIPOutputStream(bos);
+ gzip.write(data);
+ gzip.close();
+ byte[] compressed = bos.toByteArray();
+ bos.close();
+ return compressed;
+ }
+
+ public static byte[] decompress(byte[] compressed) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ return readAll(gis);
+ } catch (Exception e) {
+ return compressed;
+ }
+ }
+
+ private static byte[] readAll(InputStream input) throws IOException {
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[4096];
+ int n;
+ while (-1 != (n = input.read(buffer))) {
+ output.write(buffer, 0, n);
+ }
+ return output.toByteArray();
+ }
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
new file mode 100644
index 000000000..8d0ebd755
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
@@ -0,0 +1,55 @@
+package seaweedfs.client;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.security.SecureRandom;
+
+public class SeaweedCipher {
+ // AES-GCM parameters
+ public static final int AES_KEY_SIZE = 256; // in bits
+ public static final int GCM_NONCE_LENGTH = 12; // in bytes
+ public static final int GCM_TAG_LENGTH = 16; // in bytes
+
+ private static SecureRandom random = new SecureRandom();
+
+ public static byte[] genCipherKey() throws Exception {
+ byte[] key = new byte[AES_KEY_SIZE / 8];
+ random.nextBytes(key);
+ return key;
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception {
+ return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey);
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception {
+
+ final byte[] nonce = new byte[GCM_NONCE_LENGTH];
+ random.nextBytes(nonce);
+ GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+
+ Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec);
+
+ byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length);
+
+ byte[] iv = AES_cipherInstance.getIV();
+ byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH];
+ System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length);
+
+ return message;
+ }
+
+ public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception {
+ final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+ AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params);
+ byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH);
+ return decryptedText;
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
new file mode 100644
index 000000000..4e40ce1b6
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -0,0 +1,208 @@
+package seaweedfs.client;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SeaweedInputStream extends InputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+ private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!");
+
+ private final FilerClient filerClient;
+ private final String path;
+ private final FilerProto.Entry entry;
+ private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
+ private final long contentLength;
+
+ private long position = 0; // cursor of the file
+
+ private boolean closed = false;
+
+ public SeaweedInputStream(
+ final FilerClient filerClient,
+ final String fullpath) throws IOException {
+ this.path = fullpath;
+ this.filerClient = filerClient;
+ this.entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(fullpath),
+ SeaweedOutputStream.getFileName(fullpath));
+ this.contentLength = SeaweedRead.fileSize(entry);
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
+ }
+
+ public SeaweedInputStream(
+ final FilerClient filerClient,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.filerClient = filerClient;
+ this.path = path;
+ this.entry = entry;
+ this.contentLength = SeaweedRead.fileSize(entry);
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int numberOfBytesRead = read(b, 0, 1);
+ if (numberOfBytesRead < 0) {
+ return -1;
+ } else {
+ return (b[0] & 0xFF);
+ }
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+
+ if (b == null) {
+ throw new IllegalArgumentException("null byte array passed in to read() method");
+ }
+ if (off >= b.length) {
+ throw new IllegalArgumentException("offset greater than length of array");
+ }
+ if (len < 0) {
+ throw new IllegalArgumentException("requested read length is less than zero");
+ }
+ if (len > (b.length - off)) {
+ throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ }
+
+ ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+ return read(buf);
+
+ }
+
+ // implement ByteBufferReadable
+ public synchronized int read(ByteBuffer buf) throws IOException {
+
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+
+ long bytesRead = 0;
+ int len = buf.remaining();
+ int start = (int) this.position;
+ if (start+len <= entry.getContent().size()) {
+ entry.getContent().substring(start, start+len).copyTo(buf);
+ } else {
+ bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
+ }
+
+ if (bytesRead > Integer.MAX_VALUE) {
+ throw new IOException("Unexpected Content-Length");
+ }
+
+ if (bytesRead > 0) {
+ this.position += bytesRead;
+ }
+
+ return (int) bytesRead;
+ }
+
+ public synchronized void seek(long n) throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ if (n < 0) {
+ throw new EOFException("Cannot seek to a negative offset");
+ }
+ if (n > contentLength) {
+ throw new EOFException("Attempted to seek or read past the end of the file");
+ }
+ this.position = n;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ if (this.position == contentLength) {
+ if (n > 0) {
+ throw new EOFException("Attempted to seek or read past the end of the file");
+ }
+ }
+ long newPos = this.position + n;
+ if (newPos < 0) {
+ newPos = 0;
+ n = newPos - this.position;
+ }
+ if (newPos > contentLength) {
+ newPos = contentLength;
+ n = newPos - this.position;
+ }
+ seek(newPos);
+ return n;
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ final long remaining = this.contentLength - this.position;
+ return remaining <= Integer.MAX_VALUE
+ ? (int) remaining : Integer.MAX_VALUE;
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ return contentLength;
+ }
+
+ public synchronized long getPos() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ return position;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ }
+
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index 7b488a5da..ba298a713 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -1,48 +1,50 @@
-package seaweed.hdfs;
+package seaweedfs.client;
// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
import java.util.concurrent.*;
-import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
-
public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Path path;
+ protected final boolean supportFlush = true;
+ private final FilerClient filerClient;
+ private final String path;
private final int bufferSize;
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
+ private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private final boolean shouldSaveMetadata = false;
private FilerProto.Entry.Builder entry;
private long position;
private boolean closed;
- private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
- private ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private ByteBuffer buffer;
+ private long outputIndex;
private String replication = "000";
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
+ public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
+ this(filerClient, fullpath, "000");
+ }
+
+ public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) {
+ this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000");
+ }
+
+ public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
- this.filerGrpcClient = filerGrpcClient;
+ this.filerClient = filerClient;
this.replication = replication;
this.path = path;
this.position = position;
@@ -50,30 +52,65 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
+ if (this.entry == null) {
+ long now = System.currentTimeMillis() / 1000L;
+
+ this.entry = FilerProto.Entry.newBuilder()
+ .setName(getFileName(path))
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(0755)
+ .setReplication(replication)
+ .setCrtime(now)
+ .setMtime(now)
+ .clearGroupName()
+ );
+ }
}
- private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
+ public static String getParentDirectory(String path) {
+ int protoIndex = path.indexOf("://");
+ if (protoIndex >= 0) {
+ int pathStart = path.indexOf("/", protoIndex+3);
+ path = path.substring(pathStart);
+ }
+ if (path.equals("/")) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ if (lastSlashIndex == 0) {
+ return "/";
+ }
+ return path.substring(0, lastSlashIndex);
+ }
- LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry);
+ public static String getFileName(String path) {
+ if (path.indexOf("/") < 0) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ return path.substring(lastSlashIndex + 1);
+ }
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -87,34 +124,40 @@ public class SeaweedOutputStream extends OutputStream {
@Override
public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
+ throws IOException {
maybeThrowLastError();
- Preconditions.checkArgument(data != null, "null data");
+ if (data == null) {
+ return;
+ }
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
}
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
+ int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
+
+ if (numberOfBytesToWrite < writableBytes) {
+ buffer.put(data, currentOffset, numberOfBytesToWrite);
+ outputIndex += numberOfBytesToWrite;
+ break;
}
- writableBytes = bufferSize - bufferIndex;
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
+ buffer.put(data, currentOffset, writableBytes);
+ outputIndex += writableBytes;
+ currentOffset += writableBytes;
+ writeCurrentBufferToService();
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ writableBytes = bufferSize - buffer.position();
}
+
}
/**
@@ -149,47 +192,53 @@ public class SeaweedOutputStream extends OutputStream {
flushInternal();
threadExecutor.shutdown();
} finally {
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ lastError = new IOException("Stream is closed!");
+ ByteBufferPool.release(buffer);
buffer = null;
- bufferIndex = 0;
+ outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
threadExecutor.shutdownNow();
}
}
+
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
+ if (buffer.position() == 0) {
return;
}
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
+ position += submitWriteBufferToService(buffer, position);
- buffer = new byte[bufferSize];
- bufferIndex = 0;
- final long offset = position;
- position += bytesLength;
+ buffer = ByteBufferPool.request(bufferSize);
+
+ }
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
+
+ ((Buffer)bufferToWrite).flip();
+ int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
+
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
-
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
+ final Future<Void> job = completionService.submit(() -> {
+ // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
+ // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ ByteBufferPool.release(bufferToWrite);
+ return null;
});
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
+ writeOperations.add(new WriteOperation(job, writePosition, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
+
+ return bytesLength;
+
}
private void waitForTaskToComplete() throws IOException {
@@ -231,13 +280,13 @@ public class SeaweedOutputStream extends OutputStream {
}
}
- private synchronized void flushInternal() throws IOException {
+ protected synchronized void flushInternal() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToService();
}
- private synchronized void flushInternalAsync() throws IOException {
+ protected synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToServiceAsync();
@@ -270,10 +319,6 @@ public class SeaweedOutputStream extends OutputStream {
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkArgument(startOffset >= 0, "startOffset");
- Preconditions.checkArgument(length >= 0, "length");
-
this.task = task;
this.startOffset = startOffset;
this.length = length;
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 2efa64580..384636601 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -1,88 +1,200 @@
package seaweedfs.client;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.GzipDecompressingEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.*;
public class SeaweedRead {
- // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+
+ static ChunkCache chunkCache = new ChunkCache(4);
+ static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024);
// returns bytesRead
- public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
- final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength) throws IOException {
+ public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals,
+ final long position, final ByteBuffer buf, final long fileSize) throws IOException {
+
+ List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
- List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
+ Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
for (ChunkView chunkView : chunkViews) {
String vid = parseVolumeId(chunkView.fileId);
- lookupRequest.addVolumeIds(vid);
+ FilerProto.Locations locations = volumeIdCache.getLocations(vid);
+ if (locations == null) {
+ lookupRequest.addVolumeIds(vid);
+ } else {
+ knownLocations.put(vid, locations);
+ }
}
- FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
- .getBlockingStub().lookupVolume(lookupRequest.build());
-
- Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ if (lookupRequest.getVolumeIdsCount() > 0) {
+ FilerProto.LookupVolumeResponse lookupResponse = filerClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) {
+ volumeIdCache.setLocations(entry.getKey(), entry.getValue());
+ knownLocations.put(entry.getKey(), entry.getValue());
+ }
+ }
//TODO parallel this
long readCount = 0;
- int startOffset = bufferOffset;
+ long startOffset = position;
for (ChunkView chunkView : chunkViews) {
- FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
- if (locations.getLocationsCount() == 0) {
+
+ if (startOffset < chunkView.logicOffset) {
+ long gap = chunkView.logicOffset - startOffset;
+ LOG.debug("zero [{},{})", startOffset, startOffset + gap);
+ buf.position(buf.position()+ (int)gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
+ FilerProto.Locations locations = knownLocations.get(parseVolumeId(chunkView.fileId));
+ if (locations == null || locations.getLocationsCount() == 0) {
+ LOG.error("failed to locate {}", chunkView.fileId);
// log here!
return 0;
}
- int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+ int len = readChunkView(filerClient, startOffset, buf, chunkView, locations);
+
+ LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
readCount += len;
startOffset += len;
}
+ long limit = Math.min(buf.limit(), fileSize);
+
+ if (startOffset < limit) {
+ long gap = limit - startOffset;
+ LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
+ buf.position(buf.position()+ (int)gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
return readCount;
}
- private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
- HttpClient client = new DefaultHttpClient();
- HttpGet request = new HttpGet(
- String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+ private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
+
+ if (chunkData == null) {
+ chunkData = doFetchFullChunkData(filerClient, chunkView, locations);
+ chunkCache.setChunk(chunkView.fileId, chunkData);
+ }
+
+ int len = (int) chunkView.size;
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
+ buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
+
+ return len;
+ }
+
+ public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ byte[] data = null;
+ IOException lastException = null;
+ for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
+ for (FilerProto.Location location : locations.getLocationsList()) {
+ String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl());
+ try {
+ data = doFetchOneFullChunkData(chunkView, url);
+ lastException = null;
+ break;
+ } catch (IOException ioe) {
+ LOG.debug("doFetchFullChunkData {} :{}", url, ioe);
+ lastException = ioe;
+ }
+ }
+ if (data != null) {
+ break;
+ }
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ }
+ }
- if (!chunkView.isFullChunk) {
- request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
- request.setHeader(HttpHeaders.RANGE,
- String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size));
+ if (lastException != null) {
+ throw lastException;
}
+ LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
+
+ return data;
+
+ }
+
+ private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
+
+ HttpGet request = new HttpGet(url);
+
+ request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
+
+ byte[] data = null;
+
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
+
try {
- HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
- int len = (int) (chunkView.logicOffset - position + chunkView.size);
- OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len));
- entity.writeTo(outputStream);
- // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len);
+ Header contentEncodingHeader = entity.getContentEncoding();
+
+ if (contentEncodingHeader != null) {
+ HeaderElement[] encodings = contentEncodingHeader.getElements();
+ for (int i = 0; i < encodings.length; i++) {
+ if (encodings[i].getName().equalsIgnoreCase("gzip")) {
+ entity = new GzipDecompressingEntity(entity);
+ break;
+ }
+ }
+ }
+
+ data = EntityUtils.toByteArray(entity);
- return len;
+ EntityUtils.consume(entity);
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
+ response.close();
+ request.releaseConnection();
+ }
+
+ if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
+ try {
+ data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
+ } catch (Exception e) {
+ throw new IOException("fail to decrypt", e);
}
}
+
+ if (chunkView.isCompressed) {
+ data = Gzip.decompress(data);
+ }
+
+ LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length);
+
+ return data;
+
}
protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
@@ -90,27 +202,40 @@ public class SeaweedRead {
long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) {
- if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
+ long chunkStart = Math.max(offset, chunk.start);
+ long chunkStop = Math.min(stop, chunk.stop);
+ if (chunkStart < chunkStop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
- chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
- isFullChunk
+ chunk.fileId,
+ chunkStart - chunk.start + chunk.chunkOffset,
+ chunkStop - chunkStart,
+ chunkStart,
+ isFullChunk,
+ chunk.cipherKey,
+ chunk.isCompressed
));
- offset = Math.min(chunk.stop, stop);
}
}
return views;
}
- public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
+ public static List<VisibleInterval> nonOverlappingVisibleIntervals(
+ final FilerClient filerClient, List<FilerProto.FileChunk> chunkList) throws IOException {
+
+ chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList);
+
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- return (int) (a.getMtime() - b.getMtime());
+ // if just a.getMtime() - b.getMtime(), it will overflow!
+ if (a.getMtime() < b.getMtime()) {
+ return -1;
+ } else if (a.getMtime() > b.getMtime()) {
+ return 1;
+ }
+ return 0;
}
});
@@ -127,11 +252,14 @@ public class SeaweedRead {
List<VisibleInterval> newVisibles,
FilerProto.FileChunk chunk) {
VisibleInterval newV = new VisibleInterval(
- chunk.getOffset(),
- chunk.getOffset() + chunk.getSize(),
- chunk.getFileId(),
- chunk.getMtime(),
- true
+ chunk.getOffset(),
+ chunk.getOffset() + chunk.getSize(),
+ chunk.getFileId(),
+ chunk.getMtime(),
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed()
);
// easy cases to speed up
@@ -147,21 +275,27 @@ public class SeaweedRead {
for (VisibleInterval v : visibles) {
if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
newVisibles.add(new VisibleInterval(
- v.start,
- chunk.getOffset(),
- v.fileId,
- v.modifiedTime,
- false
+ v.start,
+ chunk.getOffset(),
+ v.fileId,
+ v.modifiedTime,
+ v.chunkOffset,
+ false,
+ v.cipherKey,
+ v.isCompressed
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
if (v.start < chunkStop && chunkStop < v.stop) {
newVisibles.add(new VisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false
+ chunkStop,
+ v.stop,
+ v.fileId,
+ v.modifiedTime,
+ v.chunkOffset + (chunkStop - v.start),
+ false,
+ v.cipherKey,
+ v.isCompressed
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -191,6 +325,10 @@ public class SeaweedRead {
return fileId;
}
+ public static long fileSize(FilerProto.Entry entry) {
+ return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ }
+
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
long size = 0;
for (FilerProto.FileChunk chunk : chunksList) {
@@ -207,25 +345,33 @@ public class SeaweedRead {
public final long stop;
public final long modifiedTime;
public final String fileId;
+ public final long chunkOffset;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
+ this.chunkOffset = chunkOffset;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isCompressed = isCompressed;
}
@Override
public String toString() {
return "VisibleInterval{" +
- "start=" + start +
- ", stop=" + stop +
- ", modifiedTime=" + modifiedTime +
- ", fileId='" + fileId + '\'' +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "start=" + start +
+ ", stop=" + stop +
+ ", modifiedTime=" + modifiedTime +
+ ", fileId='" + fileId + '\'' +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isCompressed=" + isCompressed +
+ '}';
}
}
@@ -235,24 +381,30 @@ public class SeaweedRead {
public final long size;
public final long logicOffset;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isCompressed = isCompressed;
}
@Override
public String toString() {
return "ChunkView{" +
- "fileId='" + fileId + '\'' +
- ", offset=" + offset +
- ", size=" + size +
- ", logicOffset=" + logicOffset +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "fileId='" + fileId + '\'' +
+ ", offset=" + offset +
+ ", size=" + size +
+ ", logicOffset=" + logicOffset +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isCompressed=" + isCompressed +
+ '}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
new file mode 100644
index 000000000..c465d935f
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -0,0 +1,30 @@
+package seaweedfs.client;
+
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+public class SeaweedUtil {
+
+ static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+ static CloseableHttpClient httpClient;
+
+ static {
+ // Increase max total connection to 200
+ cm.setMaxTotal(200);
+ // Increase default max connection per route to 20
+ cm.setDefaultMaxPerRoute(20);
+
+ httpClient = HttpClientBuilder.create()
+ .setConnectionManager(cm)
+ .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
+ .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
+ .build();
+ }
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return httpClient;
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 0663e8d98..f8c0c76b6 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,68 +1,114 @@
package seaweedfs.client;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
+import com.google.protobuf.ByteString;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.security.SecureRandom;
+import java.util.List;
public class SeaweedWrite {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedWrite.class);
+
+ private static final SecureRandom random = new SecureRandom();
+
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
- final FilerGrpcClient filerGrpcClient,
+ final FilerClient filerClient,
final long offset,
final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
- FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
+ final long bytesOffset, final long bytesLength,
+ final String path) throws IOException {
+ FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
+ replication, filerClient, offset, bytes, bytesOffset, bytesLength, path);
+ synchronized (entry) {
+ entry.addChunks(chunkBuilder);
+ }
+ }
+
+ public static FilerProto.FileChunk.Builder writeChunk(final String replication,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength,
+ final String path) throws IOException {
+ FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection("")
- .setReplication(replication)
+ .setCollection(filerClient.getCollection())
+ .setReplication(replication == null ? filerClient.getReplication() : replication)
.setDataCenter("")
- .setReplication("")
.setTtlSec(0)
+ .setPath(path)
.build());
String fileId = response.getFileId();
- String url = response.getUrl();
String auth = response.getAuth();
- String targetUrl = String.format("http://%s/%s", url, fileId);
- String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength);
+ String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl());
- entry.addChunks(FilerProto.FileChunk.newBuilder()
+ ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
+ byte[] cipherKey = null;
+ if (filerClient.isCipher()) {
+ cipherKey = genCipherKey();
+ cipherKeyString = ByteString.copyFrom(cipherKey);
+ }
+
+ String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
+
+ LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
+
+ return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
.setSize(bytesLength)
.setMtime(System.currentTimeMillis() / 10000L)
.setETag(etag)
- );
-
+ .setCipherKey(cipherKeyString);
}
- public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String parentDirectory, final FilerProto.Entry.Builder entry) {
- filerGrpcClient.getBlockingStub().createEntry(
- FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parentDirectory)
- .setEntry(entry)
- .build()
- );
+ public static void writeMeta(final FilerClient filerClient,
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
+
+ synchronized (entry) {
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
+ entry.clearChunks();
+ entry.addAllChunks(chunks);
+ filerClient.getBlockingStub().createEntry(
+ FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parentDirectory)
+ .setEntry(entry)
+ .build()
+ );
+ }
}
private static String multipartUpload(String targetUrl,
String auth,
final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
-
- HttpClient client = new DefaultHttpClient();
-
- InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ final long bytesOffset, final long bytesLength,
+ byte[] cipherKey) throws IOException {
+
+ InputStream inputStream = null;
+ if (cipherKey == null || cipherKey.length == 0) {
+ inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ } else {
+ try {
+ byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey);
+ inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length);
+ } catch (Exception e) {
+ throw new IOException("fail to encrypt data", e);
+ }
+ }
HttpPost post = new HttpPost(targetUrl);
if (auth != null && auth.length() != 0) {
@@ -74,8 +120,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream)
.build());
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
+
try {
- HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
@@ -83,13 +130,19 @@ public class SeaweedWrite {
etag = etag.substring(1, etag.length() - 1);
}
+ EntityUtils.consume(response.getEntity());
+
return etag;
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ post.releaseConnection();
}
}
+
+ private static byte[] genCipherKey() {
+ byte[] b = new byte[32];
+ random.nextBytes(b);
+ return b;
+ }
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
new file mode 100644
index 000000000..fd2649cc2
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
@@ -0,0 +1,36 @@
+package seaweedfs.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.concurrent.TimeUnit;
+
+public class VolumeIdCache {
+
+ private Cache<String, FilerProto.Locations> cache = null;
+
+ public VolumeIdCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(maxEntries)
+ .expireAfterAccess(5, TimeUnit.MINUTES)
+ .build();
+ }
+
+ public FilerProto.Locations getLocations(String volumeId) {
+ if (this.cache == null) {
+ return null;
+ }
+ return this.cache.getIfPresent(volumeId);
+ }
+
+ public void setLocations(String volumeId, FilerProto.Locations locations) {
+ if (this.cache == null) {
+ return;
+ }
+ this.cache.put(volumeId, locations);
+ }
+
+}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index ef847cbe7..ac4c9a0e7 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -2,6 +2,7 @@ syntax = "proto3";
package filer_pb;
+option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb";
option java_package = "seaweedfs.client";
option java_outer_classname = "FilerProto";
@@ -21,6 +22,9 @@ service SeaweedFiler {
rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) {
}
+ rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) {
+ }
+
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
}
@@ -33,6 +37,9 @@ service SeaweedFiler {
rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
}
+ rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) {
+ }
+
rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
}
@@ -42,6 +49,24 @@ service SeaweedFiler {
rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
}
+ rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
+ rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
+ rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
+ }
+
+ rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
+ }
+
+ rpc KvGet (KvGetRequest) returns (KvGetResponse) {
+ }
+
+ rpc KvPut (KvPutRequest) returns (KvPutResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -73,6 +98,9 @@ message Entry {
repeated FileChunk chunks = 3;
FuseAttributes attributes = 4;
map<string, bytes> extended = 5;
+ bytes hard_link_id = 7;
+ int32 hard_link_counter = 8; // only exists in hard link meta data
+ bytes content = 9; // if not empty, the file content
}
message FullEntry {
@@ -85,6 +113,8 @@ message EventNotification {
Entry new_entry = 2;
bool delete_chunks = 3;
string new_parent_path = 4;
+ bool is_from_other_cluster = 5;
+ repeated int32 signatures = 6;
}
message FileChunk {
@@ -96,6 +126,13 @@ message FileChunk {
string source_file_id = 6; // to be deprecated
FileId fid = 7;
FileId source_fid = 8;
+ bytes cipher_key = 9;
+ bool is_compressed = 10;
+ bool is_chunk_manifest = 11; // content is a list of FileChunks
+}
+
+message FileChunkManifest {
+ repeated FileChunk chunks = 1;
}
message FileId {
@@ -118,23 +155,39 @@ message FuseAttributes {
string user_name = 11; // for hdfs
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
+ bytes md5 = 14;
+ string disk_type = 15;
}
message CreateEntryRequest {
string directory = 1;
Entry entry = 2;
+ bool o_excl = 3;
+ bool is_from_other_cluster = 4;
+ repeated int32 signatures = 5;
}
message CreateEntryResponse {
+ string error = 1;
}
message UpdateEntryRequest {
string directory = 1;
Entry entry = 2;
+ bool is_from_other_cluster = 3;
+ repeated int32 signatures = 4;
}
message UpdateEntryResponse {
}
+message AppendToEntryRequest {
+ string directory = 1;
+ string entry_name = 2;
+ repeated FileChunk chunks = 3;
+}
+message AppendToEntryResponse {
+}
+
message DeleteEntryRequest {
string directory = 1;
string name = 2;
@@ -142,9 +195,12 @@ message DeleteEntryRequest {
bool is_delete_data = 4;
bool is_recursive = 5;
bool ignore_recursive_error = 6;
+ bool is_from_other_cluster = 7;
+ repeated int32 signatures = 8;
}
message DeleteEntryResponse {
+ string error = 1;
}
message AtomicRenameEntryRequest {
@@ -163,6 +219,9 @@ message AssignVolumeRequest {
string replication = 3;
int32 ttl_sec = 4;
string data_center = 5;
+ string path = 6;
+ string rack = 7;
+ string disk_type = 8;
}
message AssignVolumeResponse {
@@ -171,6 +230,9 @@ message AssignVolumeResponse {
string public_url = 3;
int32 count = 4;
string auth = 5;
+ string collection = 6;
+ string replication = 7;
+ string error = 8;
}
message LookupVolumeRequest {
@@ -189,6 +251,16 @@ message LookupVolumeResponse {
map<string, Locations> locations_map = 1;
}
+message Collection {
+ string name = 1;
+}
+message CollectionListRequest {
+ bool include_normal_volumes = 1;
+ bool include_ec_volumes = 2;
+}
+message CollectionListResponse {
+ repeated Collection collections = 1;
+}
message DeleteCollectionRequest {
string collection = 1;
}
@@ -200,11 +272,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
+ string disk_type = 4;
}
message StatisticsResponse {
- string replication = 1;
- string collection = 2;
- string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@@ -217,4 +287,80 @@ message GetFilerConfigurationResponse {
string replication = 2;
string collection = 3;
uint32 max_mb = 4;
+ string dir_buckets = 5;
+ bool cipher = 7;
+ int32 signature = 8;
+ string metrics_address = 9;
+ int32 metrics_interval_sec = 10;
+}
+
+message SubscribeMetadataRequest {
+ string client_name = 1;
+ string path_prefix = 2;
+ int64 since_ns = 3;
+ int32 signature = 4;
+}
+message SubscribeMetadataResponse {
+ string directory = 1;
+ EventNotification event_notification = 2;
+ int64 ts_ns = 3;
+}
+
+message LogEntry {
+ int64 ts_ns = 1;
+ int32 partition_key_hash = 2;
+ bytes data = 3;
+}
+
+message KeepConnectedRequest {
+ string name = 1;
+ uint32 grpc_port = 2;
+ repeated string resources = 3;
+}
+message KeepConnectedResponse {
+}
+
+message LocateBrokerRequest {
+ string resource = 1;
+}
+message LocateBrokerResponse {
+ bool found = 1;
+ // if found, send the exact address
+ // if not found, send the full list of existing brokers
+ message Resource {
+ string grpc_addresses = 1;
+ int32 resource_count = 2;
+ }
+ repeated Resource resources = 2;
+}
+
+// Key-Value operations
+message KvGetRequest {
+ bytes key = 1;
+}
+message KvGetResponse {
+ bytes value = 1;
+ string error = 2;
+}
+message KvPutRequest {
+ bytes key = 1;
+ bytes value = 2;
+}
+message KvPutResponse {
+ string error = 1;
+}
+
+// path-based configurations
+message FilerConf {
+ int32 version = 1;
+ message PathConf {
+ string location_prefix = 1;
+ string collection = 2;
+ string replication = 3;
+ string ttl = 4;
+ string disk_type = 5;
+ bool fsync = 6;
+ uint32 volume_growth_count = 7;
+ }
+ repeated PathConf locations = 2;
}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
new file mode 100644
index 000000000..7b5e53e19
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
@@ -0,0 +1,42 @@
+package seaweedfs.client;
+
+import org.junit.Test;
+
+import java.util.Base64;
+
+import static seaweedfs.client.SeaweedCipher.decrypt;
+import static seaweedfs.client.SeaweedCipher.encrypt;
+
+public class SeaweedCipherTest {
+
+ @Test
+ public void testSameAsGoImplemnetation() throws Exception {
+ byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+ @Test
+ public void testEncryptDecrypt() throws Exception {
+ byte[] secretKey = SeaweedCipher.genCipherKey();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
index ccfcdb117..44b833c90 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -3,13 +3,14 @@ package seaweedfs.client;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SeaweedReadTest {
@Test
- public void testNonOverlappingVisibleIntervals() {
+ public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
@@ -24,7 +25,7 @@ public class SeaweedReadTest {
.setMtime(2000)
.build());
- List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}
diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml
new file mode 100644
index 000000000..950a3f494
--- /dev/null
+++ b/other/java/examples/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.example</groupId>
+ <artifactId>unzip</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.6.4</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-hadoop2-client</artifactId>
+ <version>1.6.4</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.9.2</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java
new file mode 100644
index 000000000..d2eb94135
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java
@@ -0,0 +1,48 @@
+package com.seaweedfs.examples;
+
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.SeaweedInputStream;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public class ExampleReadFile {
+
+ public static void main(String[] args) throws IOException {
+
+ FilerClient filerClient = new FilerClient("localhost", 18888);
+
+ long startTime = System.currentTimeMillis();
+ parseZip("/Users/chris/tmp/test.zip");
+
+ long startTime2 = System.currentTimeMillis();
+
+ long localProcessTime = startTime2 - startTime;
+
+ SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
+ filerClient, "/test.zip");
+ parseZip(seaweedInputStream);
+
+ long swProcessTime = System.currentTimeMillis() - startTime2;
+
+ System.out.println("Local time: " + localProcessTime);
+ System.out.println("SeaweedFS time: " + swProcessTime);
+
+ }
+
+ public static void parseZip(String filename) throws IOException {
+ FileInputStream fileInputStream = new FileInputStream(filename);
+ parseZip(fileInputStream);
+ }
+
+ public static void parseZip(InputStream is) throws IOException {
+ ZipInputStream zin = new ZipInputStream(is);
+ ZipEntry ze;
+ while ((ze = zin.getNextEntry()) != null) {
+ System.out.println(ze.getName());
+ }
+ }
+}
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java
new file mode 100644
index 000000000..72c572d31
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java
@@ -0,0 +1,46 @@
+package com.seaweedfs.examples;
+
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+
+public class ExampleWatchFileChanges {
+
+ public static void main(String[] args) throws IOException {
+
+ FilerClient filerClient = new FilerClient("localhost", 18888);
+
+ long sinceNs = (System.currentTimeMillis() - 3600 * 1000) * 1000000L;
+
+ Iterator<FilerProto.SubscribeMetadataResponse> watch = filerClient.watch(
+ "/buckets",
+ "exampleClientName",
+ sinceNs
+ );
+
+ System.out.println("Connected to filer, subscribing from " + new Date());
+
+ while (watch.hasNext()) {
+ FilerProto.SubscribeMetadataResponse event = watch.next();
+ FilerProto.EventNotification notification = event.getEventNotification();
+ if (!event.getDirectory().equals(notification.getNewParentPath())) {
+ // move an entry to a new directory, possibly with a new name
+ if (notification.hasOldEntry() && notification.hasNewEntry()) {
+ System.out.println("moved " + event.getDirectory() + "/" + notification.getOldEntry().getName() + " to " + notification.getNewParentPath() + "/" + notification.getNewEntry().getName());
+ } else {
+ System.out.println("this should not happen.");
+ }
+ } else if (notification.hasNewEntry() && !notification.hasOldEntry()) {
+ System.out.println("created entry " + event.getDirectory() + "/" + notification.getNewEntry().getName());
+ } else if (!notification.hasNewEntry() && notification.hasOldEntry()) {
+ System.out.println("deleted entry " + event.getDirectory() + "/" + notification.getOldEntry().getName());
+ } else if (notification.hasNewEntry() && notification.hasOldEntry()) {
+ System.out.println("updated entry " + event.getDirectory() + "/" + notification.getNewEntry().getName());
+ }
+ }
+
+ }
+}
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java
new file mode 100644
index 000000000..26b74028f
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java
@@ -0,0 +1,47 @@
+package com.seaweedfs.examples;
+
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.SeaweedInputStream;
+import seaweedfs.client.SeaweedOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public class ExampleWriteFile {
+
+ public static void main(String[] args) throws IOException {
+
+ FilerClient filerClient = new FilerClient("localhost", 18888);
+
+ SeaweedInputStream seaweedInputStream = new SeaweedInputStream(filerClient, "/test.zip");
+ unZipFiles(filerClient, seaweedInputStream);
+
+ }
+
+ public static void unZipFiles(FilerClient filerClient, InputStream is) throws IOException {
+ ZipInputStream zin = new ZipInputStream(is);
+ ZipEntry ze;
+ while ((ze = zin.getNextEntry()) != null) {
+
+ String filename = ze.getName();
+ if (filename.indexOf("/") >= 0) {
+ filename = filename.substring(filename.lastIndexOf("/") + 1);
+ }
+ if (filename.length()==0) {
+ continue;
+ }
+
+ SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerClient, "/test/"+filename);
+ byte[] bytesIn = new byte[16 * 1024];
+ int read = 0;
+ while ((read = zin.read(bytesIn))!=-1) {
+ seaweedOutputStream.write(bytesIn,0,read);
+ }
+ seaweedOutputStream.close();
+
+ System.out.println(ze.getName());
+ }
+ }
+}
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java
new file mode 100644
index 000000000..006c581c9
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java
@@ -0,0 +1,25 @@
+package com.seaweedfs.examples;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.*;
+
+public class HdfsCopyFile {
+ public static void main(String[] args) throws IOException {
+ Configuration configuration = new Configuration();
+
+ configuration.set("fs.defaultFS", "seaweedfs://localhost:8888");
+ configuration.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+
+ FileSystem fs = FileSystem.get(configuration);
+ String source = "/Users/chris/tmp/test.zip";
+ String destination = "/buckets/spark/test01.zip";
+ InputStream in = new BufferedInputStream(new FileInputStream(source));
+
+ OutputStream out = fs.create(new Path(destination));
+ IOUtils.copyBytes(in, out, 4096, true);
+ }
+}
diff --git a/other/java/hdfs-over-ftp/pom.xml b/other/java/hdfs-over-ftp/pom.xml
new file mode 100644
index 000000000..0db422db5
--- /dev/null
+++ b/other/java/hdfs-over-ftp/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>hdfs-over-ftp</groupId>
+ <artifactId>hdfs-over-ftp</artifactId>
+ <version>1.0</version>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.4.3</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger2</artifactId>
+ <version>2.9.2</version>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger-ui</artifactId>
+ <version>2.9.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ftpserver</groupId>
+ <artifactId>ftpserver-core</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-hadoop3-client</artifactId>
+ <version>1.6.2</version>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <encoding>UTF-8</encoding>
+ <compilerArguments>
+ <verbose />
+ <bootclasspath>${java.home}/lib/rt.jar</bootclasspath>
+ </compilerArguments>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.hadoop.seaweed.ftp.ApplicationServer</mainClass>
+ <addClasspath>true</addClasspath>
+ <classpathPrefix>lib/</classpathPrefix>
+ </manifest>
+ <manifestEntries>
+ <Class-Path>./</Class-Path>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <appendAssemblyId>false</appendAssemblyId>
+ <descriptors>
+ <descriptor>src/main/resources/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java
new file mode 100644
index 000000000..b8ef1d840
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java
@@ -0,0 +1,14 @@
+package org.apache.hadoop.seaweed.ftp;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class ApplicationServer {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ApplicationServer.class, args);
+ }
+
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java
new file mode 100644
index 000000000..3c395493d
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java
@@ -0,0 +1,27 @@
+package org.apache.hadoop.seaweed.ftp.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import springfox.documentation.builders.ApiInfoBuilder;
+import springfox.documentation.builders.PathSelectors;
+import springfox.documentation.builders.RequestHandlerSelectors;
+import springfox.documentation.spi.DocumentationType;
+import springfox.documentation.spring.web.plugins.Docket;
+import springfox.documentation.swagger2.annotations.EnableSwagger2;
+
+@Configuration
+@EnableSwagger2
+public class SwaggerConfig {
+ @Bean
+ public Docket createRestApi() {
+ return new Docket(DocumentationType.SWAGGER_2)
+ .pathMapping("/")
+ .select()
+ .apis(RequestHandlerSelectors.basePackage("org.apache.hadoop.seaweed.ftp"))
+ .paths(PathSelectors.any())
+ .build().apiInfo(new ApiInfoBuilder()
+ .title("FTP API Doc")
+ .version("1.0")
+ .build());
+ }
+} \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java
new file mode 100644
index 000000000..7a5a4e74d
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java
@@ -0,0 +1,71 @@
+package org.apache.hadoop.seaweed.ftp.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.hadoop.seaweed.ftp.service.HFtpService;
+import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
+import org.apache.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/manager")
+@Api(tags = "FTP操作管理")
+public class FtpManagerController {
+
+ private static Logger log = Logger.getLogger(FtpManagerController.class);
+
+ @Autowired
+ private HFtpService hdfsOverFtpServer;
+
+ @GetMapping("/status")
+ @ApiOperation("查看FTP服务状态")
+ public Result status() {
+ Map map = new HashMap<>();
+ try {
+ boolean status = hdfsOverFtpServer.statusServer();
+ map.put("is_running", status);
+ return new Result(true, map, "FTP 服务状态获取成功");
+ }catch (Exception e) {
+ log.error(e);
+ map.put("is_running", false);
+ return new Result(true, map, "FTP 服务状态获取成功");
+ }
+ }
+
+ @PutMapping("/start")
+ @ApiOperation("启动FTP服务")
+ public Result start() {
+ try {
+ boolean status = hdfsOverFtpServer.statusServer();
+ if(!status) {
+ hdfsOverFtpServer.startServer();
+ }
+ return new Result(true, "FTP 服务启动成功");
+ }catch (Exception e) {
+ log.error(e);
+ return new Result(false, "FTP 服务启动失败");
+ }
+ }
+
+ @PutMapping("/stop")
+ @ApiOperation("停止FTP服务")
+ public Result stop() {
+ try {
+ boolean status = hdfsOverFtpServer.statusServer();
+ if(status) {
+ hdfsOverFtpServer.stopServer();
+ }
+ return new Result(true, "FTP 服务停止成功");
+ }catch (Exception e) {
+ log.error(e);
+ return new Result(false, "FTP 服务停止失败");
+ }
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java
new file mode 100644
index 000000000..c4d2261b3
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java
@@ -0,0 +1,98 @@
+package org.apache.hadoop.seaweed.ftp.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.usermanager.Md5PasswordEncryptor;
+import org.apache.ftpserver.usermanager.UserFactory;
+import org.apache.hadoop.seaweed.ftp.controller.vo.FtpUser;
+import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
+import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
+import org.apache.log4j.Logger;
+import org.springframework.web.bind.annotation.*;
+
+import java.io.File;
+
+@RestController
+@RequestMapping("/user")
+@Api(tags = "FTP用户管理")
+public class UserController {
+
+ private static Logger log = Logger.getLogger(UserController.class);
+
+ /***
+ * {
+ * "name": "test",
+ * "password": "test",
+ * "homeDirectory": "/buckets/test/"
+ * }
+ * @param ftpUser
+ * @return
+ */
+ @PostMapping("/add")
+ @ApiOperation("新增/编辑用户")
+ public Result add(@RequestBody FtpUser ftpUser) {
+ try {
+ HdfsUserManager userManagerFactory = new HdfsUserManager();
+ userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
+ userManagerFactory.setPasswordEncryptor(new Md5PasswordEncryptor());
+
+ UserFactory userFactory = new UserFactory();
+ userFactory.setHomeDirectory(ftpUser.getHomeDirectory());
+ userFactory.setName(ftpUser.getName());
+ userFactory.setPassword(ftpUser.getPassword());
+ userFactory.setEnabled(ftpUser.isEnabled());
+ userFactory.setMaxIdleTime(ftpUser.getMaxIdleTime());
+
+ User user = userFactory.createUser();
+ userManagerFactory.save(user, ftpUser.isRenamePush());
+ return new Result(true, "新建用户成功");
+ }catch (Exception e) {
+ log.error(e);
+ return new Result(false, "新建用户失败");
+ }
+ }
+
+ @DeleteMapping("/delete/{user}")
+ @ApiOperation("删除用户")
+ public Result delete(@PathVariable(value = "user") String user) {
+ try {
+ HdfsUserManager userManagerFactory = new HdfsUserManager();
+ userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
+ userManagerFactory.delete(user);
+ return new Result(true, "删除用户成功");
+ }catch (Exception e) {
+ log.error(e);
+ return new Result(false, "删除用户失败");
+ }
+ }
+
+ @GetMapping("/show/{userName}")
+ @ApiOperation("查看用户")
+ public Result show(@PathVariable(value = "userName") String userName) {
+ try {
+ HdfsUserManager userManagerFactory = new HdfsUserManager();
+ userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
+ User user = userManagerFactory.getUserByName(userName);
+ FtpUser ftpUser = new FtpUser(user.getHomeDirectory(), user.getPassword(), user.getEnabled(), user.getName(), user.getMaxIdleTime(), HdfsUserManager.getUserRenamePush(userName));
+ return new Result(true, ftpUser, "获取用户信息成功");
+ }catch (Exception e) {
+ log.error(e);
+ return new Result(false, "获取用户信息失败");
+ }
+ }
+
+ @GetMapping("/list")
+ @ApiOperation("列举用户")
+ public Result list() {
+ try {
+ HdfsUserManager userManagerFactory = new HdfsUserManager();
+ userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
+ String[] allUserNames = userManagerFactory.getAllUserNames();
+ return new Result(true, allUserNames, "列举用户成功");
+ }catch (Exception e) {
+ log.error(e);
+ return new Result(false, "列举用户失败");
+ }
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java
new file mode 100644
index 000000000..953d08603
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java
@@ -0,0 +1,71 @@
+package org.apache.hadoop.seaweed.ftp.controller.vo;
+
+public class FtpUser {
+
+ private String homeDirectory;
+ private String password;
+ private boolean enabled;
+ private String name;
+ private int maxIdleTime;
+ private boolean renamePush;
+
+ public FtpUser() {
+ }
+
+ public FtpUser(String homeDirectory, String password, boolean enabled, String name, int maxIdleTime, boolean renamePush) {
+ this.homeDirectory = homeDirectory;
+ this.password = password;
+ this.enabled = enabled;
+ this.name = name;
+ this.maxIdleTime = maxIdleTime;
+ this.renamePush = renamePush;
+ }
+
+ public String getHomeDirectory() {
+ return homeDirectory;
+ }
+
+ public void setHomeDirectory(String homeDirectory) {
+ this.homeDirectory = homeDirectory;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getMaxIdleTime() {
+ return maxIdleTime;
+ }
+
+ public void setMaxIdleTime(int maxIdleTime) {
+ this.maxIdleTime = maxIdleTime;
+ }
+
+ public boolean isRenamePush() {
+ return renamePush;
+ }
+
+ public void setRenamePush(boolean renamePush) {
+ this.renamePush = renamePush;
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java
new file mode 100644
index 000000000..b6a480ba7
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java
@@ -0,0 +1,43 @@
+package org.apache.hadoop.seaweed.ftp.controller.vo;
+
+public class Result {
+
+ private boolean status;
+ private Object data;
+ private String message;
+
+ public Result(boolean status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ public Result(boolean status, Object data, String message) {
+ this.status = status;
+ this.message = message;
+ this.data = data;
+ }
+
+ public boolean isStatus() {
+ return status;
+ }
+
+ public void setStatus(boolean status) {
+ this.status = status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public Object getData() {
+ return data;
+ }
+
+ public void setData(Object data) {
+ this.data = data;
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java
new file mode 100644
index 000000000..c3fa31872
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java
@@ -0,0 +1,102 @@
+package org.apache.hadoop.seaweed.ftp.service;
+
+import org.apache.ftpserver.DataConnectionConfiguration;
+import org.apache.ftpserver.DataConnectionConfigurationFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsFileSystemManager;
+import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsOverFtpSystem;
+import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
+import org.apache.log4j.Logger;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.File;
+
+/**
+ * reference: https://github.com/iponweb/hdfs-over-ftp
+ */
+@Component
+public class HFtpService {
+
+ private static Logger log = Logger.getLogger(HFtpService.class);
+
+ @Value("${ftp.port}")
+ private int port = 0;
+
+ @Value("${ftp.passive-address}")
+ private String passiveAddress;
+
+ @Value("${ftp.passive-ports}")
+ private String passivePorts;
+
+ @Value("${hdfs.uri}")
+ private String hdfsUri;
+
+ @Value("${seaweedFs.enable}")
+ private boolean seaweedFsEnable;
+
+ @Value("${seaweedFs.access}")
+ private String seaweedFsAccess;
+
+ @Value("${seaweedFs.replication}")
+ private String seaweedFsReplication;
+
+ private FtpServer ftpServer = null;
+
+ public void startServer() throws Exception {
+ log.info("Starting HDFS-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
+
+ HdfsOverFtpSystem.setHdfsUri(hdfsUri);
+ HdfsOverFtpSystem.setSeaweedFsEnable(seaweedFsEnable);
+ HdfsOverFtpSystem.setSeaweedFsAccess(seaweedFsAccess);
+ HdfsOverFtpSystem.setSeaweedFsReplication(seaweedFsReplication);
+
+ FtpServerFactory server = new FtpServerFactory();
+ server.setFileSystem(new HdfsFileSystemManager());
+
+ ListenerFactory factory = new ListenerFactory();
+ factory.setPort(port);
+
+ DataConnectionConfigurationFactory dccFactory = new DataConnectionConfigurationFactory();
+ dccFactory.setPassiveAddress("0.0.0.0");
+ dccFactory.setPassivePorts(passivePorts);
+ dccFactory.setPassiveExternalAddress(passiveAddress);
+ DataConnectionConfiguration dcc = dccFactory.createDataConnectionConfiguration();
+ factory.setDataConnectionConfiguration(dcc);
+
+ server.addListener("default", factory.createListener());
+
+ HdfsUserManager userManager = new HdfsUserManager();
+ final File file = loadResource("/users.properties");
+ userManager.setFile(file);
+ server.setUserManager(userManager);
+
+ CommandFactoryFactory cmFact = new CommandFactoryFactory();
+ cmFact.setUseDefaultCommands(true);
+ server.setCommandFactory(cmFact.createCommandFactory());
+
+ // start the server
+ ftpServer = server.createServer();
+ ftpServer.start();
+ }
+
+ public void stopServer() {
+ log.info("Stopping Hdfs-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
+ ftpServer.stop();
+ }
+
+ public boolean statusServer() {
+ try {
+ return !ftpServer.isStopped();
+ }catch (Exception e) {
+ return false;
+ }
+ }
+
+ private static File loadResource(String resourceName) {
+ return new File(System.getProperty("user.dir") + resourceName);
+ }
+} \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java
new file mode 100644
index 000000000..e97c2dc14
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java
@@ -0,0 +1,333 @@
+package org.apache.hadoop.seaweed.ftp.service.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.seaweed.ftp.users.HdfsUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class implements all actions to HDFS
+ */
+public class HdfsFileObject implements FtpFile {
+
+ private final Logger log = LoggerFactory.getLogger(HdfsFileObject.class);
+
+ private Path homePath;
+ private Path path;
+ private Path fullPath;
+ private HdfsUser user;
+
+ /**
+ * Constructs HdfsFileObject from path
+ *
+ * @param path path to represent object
+ * @param user accessor of the object
+ */
+ public HdfsFileObject(String homePath, String path, User user) {
+ this.homePath = new Path(homePath);
+ this.path = new Path(path);
+ this.fullPath = new Path(homePath + path);
+ this.user = (HdfsUser) user;
+ }
+
+ public String getAbsolutePath() {
+ // strip the last '/' if necessary
+ String fullName = path.toString();
+ int filelen = fullName.length();
+ if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
+ fullName = fullName.substring(0, filelen - 1);
+ }
+
+ return fullName;
+ }
+
+ public String getName() {
+ return path.getName();
+ }
+
+ /**
+ * HDFS has no hidden objects
+ *
+ * @return always false
+ */
+ public boolean isHidden() {
+ return false;
+ }
+
+ /**
+ * Checks if the object is a directory
+ *
+ * @return true if the object is a directory
+ */
+ public boolean isDirectory() {
+ try {
+ log.debug("is directory? : " + fullPath);
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ FileStatus fs = dfs.getFileStatus(fullPath);
+ return fs.isDir();
+ } catch (IOException e) {
+ log.debug(fullPath + " is not dir", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if the object is a file
+ *
+ * @return true if the object is a file
+ */
+ public boolean isFile() {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ return dfs.isFile(fullPath);
+ } catch (IOException e) {
+ log.debug(fullPath + " is not file", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if the object does exist
+ *
+ * @return true if the object does exist
+ */
+ public boolean doesExist() {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ dfs.getFileStatus(fullPath);
+ return true;
+ } catch (IOException e) {
+ // log.debug(path + " does not exist", e);
+ return false;
+ }
+ }
+
+ public boolean isReadable() {
+ return true;
+ }
+
+ public boolean isWritable() {
+ return true;
+ }
+
+ public boolean isRemovable() {
+ return true;
+ }
+
+ /**
+ * Get owner of the object
+ *
+ * @return owner of the object
+ */
+ public String getOwnerName() {
+ return "root";
+ /*
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ FileStatus fs = dfs.getFileStatus(fullPath);
+ String owner = fs.getOwner();
+ if(owner.length() == 0) {
+ return "root";
+ }
+ return owner;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ */
+ }
+
+ /**
+ * Get group of the object
+ *
+ * @return group of the object
+ */
+ public String getGroupName() {
+ return "root";
+ /*
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ FileStatus fs = dfs.getFileStatus(fullPath);
+ String group = fs.getGroup();
+ if(group.length() == 0) {
+ return "root";
+ }
+ return group;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ */
+ }
+
+ /**
+ * Get link count
+ *
+ * @return 3 is for a directory and 1 is for a file
+ */
+ public int getLinkCount() {
+ return isDirectory() ? 3 : 1;
+ }
+
+ /**
+ * Get last modification date
+ *
+ * @return last modification date as a long
+ */
+ public long getLastModified() {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ FileStatus fs = dfs.getFileStatus(fullPath);
+ return fs.getModificationTime();
+ } catch (IOException e) {
+ e.printStackTrace();
+ return 0;
+ }
+ }
+
+ public boolean setLastModified(long l) {
+ return false;
+ }
+
+ /**
+ * Get a size of the object
+ *
+ * @return size of the object in bytes
+ */
+ public long getSize() {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ FileStatus fs = dfs.getFileStatus(fullPath);
+ log.debug("getSize(): " + fullPath + " : " + fs.getLen());
+ return fs.getLen();
+ } catch (IOException e) {
+ e.printStackTrace();
+ return 0;
+ }
+ }
+
+ public Object getPhysicalFile() {
+ return null;
+ }
+
+ /**
+ * Create a new dir from the object
+ *
+ * @return true if dir is created
+ */
+ public boolean mkdir() {
+ try {
+ FileSystem fs = HdfsOverFtpSystem.getDfs();
+ fs.mkdirs(fullPath);
+// fs.setOwner(path, user.getName(), user.getMainGroup());
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ /**
+ * Delete object from the HDFS filesystem
+ *
+ * @return true if the object is deleted
+ */
+ public boolean delete() {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ dfs.delete(fullPath, true);
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public boolean move(FtpFile ftpFile) {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ dfs.rename(fullPath, new Path(fullPath.getParent() + File.separator + ftpFile.getName()));
+ return true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+
+ /**
+ * List files of the directory
+ *
+ * @return List of files in the directory
+ */
+ public List<FtpFile> listFiles() {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ FileStatus fileStats[] = dfs.listStatus(fullPath);
+
+ // get the virtual name of the base directory
+ String virtualFileStr = getAbsolutePath();
+ if (virtualFileStr.charAt(virtualFileStr.length() - 1) != '/') {
+ virtualFileStr += '/';
+ }
+
+ FtpFile[] virtualFiles = new FtpFile[fileStats.length];
+ for (int i = 0; i < fileStats.length; i++) {
+ File fileObj = new File(fileStats[i].getPath().toString());
+ String fileName = virtualFileStr + fileObj.getName();
+ virtualFiles[i] = new HdfsFileObject(homePath.toString(), fileName, user);
+ }
+ return Collections.unmodifiableList(Arrays.asList(virtualFiles));
+ } catch (IOException e) {
+ log.debug("", e);
+ return null;
+ }
+ }
+
+ /**
+ * Creates output stream to write to the object
+ *
+ * @param l is not used here
+ * @return OutputStream
+ * @throws IOException
+ */
+ public OutputStream createOutputStream(long l) {
+ try {
+ FileSystem fs = HdfsOverFtpSystem.getDfs();
+ FSDataOutputStream out = fs.create(fullPath);
+// fs.setOwner(fullPath, user.getName(), user.getMainGroup());
+ return out;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ /**
+ * Creates input stream to read from the object
+ *
+ * @param l is not used here
+ * @return OutputStream
+ * @throws IOException
+ */
+ public InputStream createInputStream(long l) {
+ try {
+ FileSystem dfs = HdfsOverFtpSystem.getDfs();
+ FSDataInputStream in = dfs.open(fullPath);
+ return in;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java
new file mode 100644
index 000000000..533c2c3aa
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java
@@ -0,0 +1,14 @@
+package org.apache.hadoop.seaweed.ftp.service.filesystem;
+
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.User;
+
+/**
+ * Impelented FileSystemManager to use HdfsFileSystemView
+ */
+public class HdfsFileSystemManager implements FileSystemFactory {
+ public FileSystemView createFileSystemView(User user) {
+ return new HdfsFileSystemView(user);
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java
new file mode 100644
index 000000000..8b910e775
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java
@@ -0,0 +1,104 @@
+package org.apache.hadoop.seaweed.ftp.service.filesystem;
+
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+
+/**
+ * Implemented FileSystemView to use HdfsFileObject
+ */
+public class HdfsFileSystemView implements FileSystemView {
+
+ private String homePath;
+ private String currPath = File.separator;
+ private User user;
+
+ /**
+ * Constructor - set the user object.
+ */
+ protected HdfsFileSystemView(User user) {
+ if (user == null) {
+ throw new IllegalArgumentException("user can not be null");
+ }
+ if (user.getHomeDirectory() == null) {
+ throw new IllegalArgumentException(
+ "User home directory can not be null");
+ }
+
+ this.homePath = user.getHomeDirectory();
+ this.user = user;
+ }
+
+ public FtpFile getHomeDirectory() {
+ return new HdfsFileObject(homePath, File.separator, user);
+ }
+
+ public FtpFile getWorkingDirectory() {
+ FtpFile fileObj;
+ if (currPath.equals(File.separator)) {
+ fileObj = new HdfsFileObject(homePath, File.separator, user);
+ } else {
+ fileObj = new HdfsFileObject(homePath, currPath, user);
+
+ }
+ return fileObj;
+ }
+
+ public boolean changeWorkingDirectory(String dir) {
+
+ Path path;
+ if (dir.startsWith(File.separator) || new Path(currPath).equals(new Path(dir))) {
+ path = new Path(dir);
+ } else if (currPath.length() > 1) {
+ path = new Path(currPath + File.separator + dir);
+ } else {
+ if(dir.startsWith("/")) {
+ path = new Path(dir);
+ }else {
+ path = new Path(File.separator + dir);
+ }
+ }
+
+ // 防止退回根目录
+ if (path.getName().equals("..")) {
+ path = new Path(File.separator);
+ }
+
+ HdfsFileObject file = new HdfsFileObject(homePath, path.toString(), user);
+ if (file.isDirectory()) {
+ currPath = path.toString();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public FtpFile getFile(String file) {
+ String path;
+ if (file.startsWith(File.separator)) {
+ path = file;
+ } else if (currPath.length() > 1) {
+ path = currPath + File.separator + file;
+ } else {
+ path = File.separator + file;
+ }
+ return new HdfsFileObject(homePath, path, user);
+ }
+
+ /**
+ * Is the file content random accessible?
+ */
+ public boolean isRandomAccessible() {
+ return true;
+ }
+
+ /**
+ * Dispose file system view - does nothing.
+ */
+ public void dispose() {
+ }
+
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java
new file mode 100644
index 000000000..149fd6857
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java
@@ -0,0 +1,72 @@
+package org.apache.hadoop.seaweed.ftp.service.filesystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Class to store DFS connection
+ */
+public class HdfsOverFtpSystem {
+
+ private static FileSystem fs = null;
+
+ private static String hdfsUri;
+
+ private static boolean seaweedFsEnable;
+
+ private static String seaweedFsAccess;
+
+ private static String seaweedFsReplication;
+
+ private final static Logger log = LoggerFactory.getLogger(HdfsOverFtpSystem.class);
+
+ private static void hdfsInit() throws IOException {
+ Configuration configuration = new Configuration();
+
+ configuration.set("fs.defaultFS", hdfsUri);
+ if(seaweedFsEnable) {
+ configuration.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ configuration.set("fs.seaweed.volume.server.access", seaweedFsAccess);
+ configuration.set("fs.seaweed.replication", seaweedFsReplication);
+ }
+ fs = FileSystem.get(configuration);
+ log.info("HDFS load success");
+ }
+
+ /**
+ * Get dfs
+ *
+ * @return dfs
+ * @throws IOException
+ */
+ public static FileSystem getDfs() throws IOException {
+ if (fs == null) {
+ hdfsInit();
+ }
+ return fs;
+ }
+
+ public static void setHdfsUri(String hdfsUri) {
+ HdfsOverFtpSystem.hdfsUri = hdfsUri;
+ }
+
+ public static String getHdfsUri() {
+ return hdfsUri;
+ }
+
+ public static void setSeaweedFsEnable(boolean seaweedFsEnable) {
+ HdfsOverFtpSystem.seaweedFsEnable = seaweedFsEnable;
+ }
+
+ public static void setSeaweedFsAccess(String seaweedFsAccess) {
+ HdfsOverFtpSystem.seaweedFsAccess = seaweedFsAccess;
+ }
+
+ public static void setSeaweedFsReplication(String seaweedFsReplication) {
+ HdfsOverFtpSystem.seaweedFsReplication = seaweedFsReplication;
+ }
+} \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java
new file mode 100644
index 000000000..c82f6516f
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java
@@ -0,0 +1,239 @@
+package org.apache.hadoop.seaweed.ftp.users;
+
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.AuthorizationRequest;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class HdfsUser implements User, Serializable {
+
+ private static final long serialVersionUID = -47371353779731294L;
+
+ private String name = null;
+
+ private String password = null;
+
+ private int maxIdleTimeSec = 0; // no limit
+
+ private String homeDir = null;
+
+ private boolean isEnabled = true;
+
+ private List<? extends Authority> authorities = new ArrayList<Authority>();
+
+ private ArrayList<String> groups = new ArrayList<String>();
+
+ private Logger log = Logger.getLogger(HdfsUser.class);
+
+ /**
+ * Default constructor.
+ */
+ public HdfsUser() {
+ }
+
+ /**
+ * Copy constructor.
+ */
+ public HdfsUser(User user) {
+ name = user.getName();
+ password = user.getPassword();
+ authorities = user.getAuthorities();
+ maxIdleTimeSec = user.getMaxIdleTime();
+ homeDir = user.getHomeDirectory();
+ isEnabled = user.getEnabled();
+ }
+
+ public ArrayList<String> getGroups() {
+ return groups;
+ }
+
+ /**
+ * Get the main group of the user
+ *
+ * @return main group of the user
+ */
+ public String getMainGroup() {
+ if (groups.size() > 0) {
+ return groups.get(0);
+ } else {
+ log.error("User " + name + " is not a memer of any group");
+ return "error";
+ }
+ }
+
+ /**
+ * Checks if user is a member of the group
+ *
+ * @param group to check
+ * @return true if the user id a member of the group
+ */
+ public boolean isGroupMember(String group) {
+ for (String userGroup : groups) {
+ if (userGroup.equals(group)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Set users' groups
+ *
+ * @param groups to set
+ */
+ public void setGroups(ArrayList<String> groups) {
+ if (groups.size() < 1) {
+ log.error("User " + name + " is not a memer of any group");
+ }
+ this.groups = groups;
+ }
+
+ /**
+ * Get the user name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set user name.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the user password.
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Set user password.
+ */
+ public void setPassword(String pass) {
+ password = pass;
+ }
+
+ public List<Authority> getAuthorities() {
+ if (authorities != null) {
+ return Collections.unmodifiableList(authorities);
+ } else {
+ return null;
+ }
+ }
+
+ public void setAuthorities(List<Authority> authorities) {
+ if (authorities != null) {
+ this.authorities = Collections.unmodifiableList(authorities);
+ } else {
+ this.authorities = null;
+ }
+ }
+
+ /**
+ * Get the maximum idle time in second.
+ */
+ public int getMaxIdleTime() {
+ return maxIdleTimeSec;
+ }
+
+ /**
+ * Set the maximum idle time in second.
+ */
+ public void setMaxIdleTime(int idleSec) {
+ maxIdleTimeSec = idleSec;
+ if (maxIdleTimeSec < 0) {
+ maxIdleTimeSec = 0;
+ }
+ }
+
+ /**
+ * Get the user enable status.
+ */
+ public boolean getEnabled() {
+ return isEnabled;
+ }
+
+ /**
+ * Set the user enable status.
+ */
+ public void setEnabled(boolean enb) {
+ isEnabled = enb;
+ }
+
+ /**
+ * Get the user home directory.
+ */
+ public String getHomeDirectory() {
+ return homeDir;
+ }
+
+ /**
+ * Set the user home directory.
+ */
+ public void setHomeDirectory(String home) {
+ homeDir = home;
+ }
+
+ /**
+ * String representation.
+ */
+ public String toString() {
+ return name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public AuthorizationRequest authorize(AuthorizationRequest request) {
+ List<Authority> authorities = getAuthorities();
+
+ // check for no authorities at all
+ if (authorities == null) {
+ return null;
+ }
+
+ boolean someoneCouldAuthorize = false;
+ for (Authority authority : authorities) {
+ if (authority.canAuthorize(request)) {
+ someoneCouldAuthorize = true;
+
+ request = authority.authorize(request);
+
+ // authorization failed, return null
+ if (request == null) {
+ return null;
+ }
+ }
+
+ }
+
+ if (someoneCouldAuthorize) {
+ return request;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<Authority> getAuthorities(Class<? extends Authority> clazz) {
+ List<Authority> selected = new ArrayList<Authority>();
+
+ for (Authority authority : authorities) {
+ if (authority.getClass().equals(clazz)) {
+ selected.add(authority);
+ }
+ }
+
+ return selected;
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java
new file mode 100644
index 000000000..7eb296160
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java
@@ -0,0 +1,453 @@
+package org.apache.hadoop.seaweed.ftp.users;
+
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.ftplet.*;
+import org.apache.ftpserver.usermanager.*;
+import org.apache.ftpserver.usermanager.impl.*;
+import org.apache.ftpserver.util.BaseProperties;
+import org.apache.ftpserver.util.IoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+public class HdfsUserManager extends AbstractUserManager {
+
+ private final Logger LOG = LoggerFactory
+ .getLogger(HdfsUserManager.class);
+
+ private final static String DEPRECATED_PREFIX = "FtpServer.user.";
+
+ private final static String PREFIX = "ftpserver.user.";
+
+ private static BaseProperties userDataProp;
+
+ private File userDataFile = new File("users.conf");
+
+ private boolean isConfigured = false;
+
+ private PasswordEncryptor passwordEncryptor = new Md5PasswordEncryptor();
+
+
+ /**
+ * Retrieve the file used to load and store users
+ *
+ * @return The file
+ */
+ public File getFile() {
+ return userDataFile;
+ }
+
+ /**
+ * Set the file used to store and read users. Must be set before
+ * {@link #configure()} is called.
+ *
+ * @param propFile A file containing users
+ */
+ public void setFile(File propFile) {
+ if (isConfigured) {
+ throw new IllegalStateException("Must be called before configure()");
+ }
+
+ this.userDataFile = propFile;
+ }
+
+
+ /**
+ * Retrieve the password encryptor used for this user manager
+ *
+ * @return The password encryptor. Default to {@link Md5PasswordEncryptor}
+ * if no other has been provided
+ */
+ public PasswordEncryptor getPasswordEncryptor() {
+ return passwordEncryptor;
+ }
+
+
+ /**
+ * Set the password encryptor to use for this user manager
+ *
+ * @param passwordEncryptor The password encryptor
+ */
+ public void setPasswordEncryptor(PasswordEncryptor passwordEncryptor) {
+ this.passwordEncryptor = passwordEncryptor;
+ }
+
+
+ /**
+ * Lazy init the user manager
+ */
+ private void lazyInit() {
+ if (!isConfigured) {
+ configure();
+ }
+ }
+
+ /**
+ * Configure user manager.
+ */
+ public void configure() {
+ isConfigured = true;
+ try {
+ userDataProp = new BaseProperties();
+
+ if (userDataFile != null && userDataFile.exists()) {
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(userDataFile);
+ userDataProp.load(fis);
+ } finally {
+ IoUtils.close(fis);
+ }
+ }
+ } catch (IOException e) {
+ throw new FtpServerConfigurationException(
+ "Error loading user data file : "
+ + userDataFile.getAbsolutePath(), e);
+ }
+
+ convertDeprecatedPropertyNames();
+ }
+
+ private void convertDeprecatedPropertyNames() {
+ Enumeration<?> keys = userDataProp.propertyNames();
+
+ boolean doSave = false;
+
+ while (keys.hasMoreElements()) {
+ String key = (String) keys.nextElement();
+
+ if (key.startsWith(DEPRECATED_PREFIX)) {
+ String newKey = PREFIX
+ + key.substring(DEPRECATED_PREFIX.length());
+ userDataProp.setProperty(newKey, userDataProp.getProperty(key));
+ userDataProp.remove(key);
+
+ doSave = true;
+ }
+ }
+
+ if (doSave) {
+ try {
+ saveUserData();
+ } catch (FtpException e) {
+ throw new FtpServerConfigurationException(
+ "Failed to save updated user data", e);
+ }
+ }
+ }
+
+ public synchronized void save(User usr, boolean renamePush) throws FtpException {
+ lazyInit();
+ userDataProp.setProperty(PREFIX + usr.getName() + ".rename.push", renamePush);
+ save(usr);
+ }
+
+ /**
+ * Save user data. Store the properties.
+ */
+ public synchronized void save(User usr) throws FtpException {
+ lazyInit();
+
+ // null value check
+ if (usr.getName() == null) {
+ throw new NullPointerException("User name is null.");
+ }
+ String thisPrefix = PREFIX + usr.getName() + '.';
+
+ // set other properties
+ userDataProp.setProperty(thisPrefix + ATTR_PASSWORD, getPassword(usr));
+
+ String home = usr.getHomeDirectory();
+ if (home == null) {
+ home = "/";
+ }
+ userDataProp.setProperty(thisPrefix + ATTR_HOME, home);
+ userDataProp.setProperty(thisPrefix + ATTR_ENABLE, usr.getEnabled());
+ userDataProp.setProperty(thisPrefix + ATTR_WRITE_PERM, usr
+ .authorize(new WriteRequest()) != null);
+ userDataProp.setProperty(thisPrefix + ATTR_MAX_IDLE_TIME, usr
+ .getMaxIdleTime());
+
+ TransferRateRequest transferRateRequest = new TransferRateRequest();
+ transferRateRequest = (TransferRateRequest) usr
+ .authorize(transferRateRequest);
+
+ if (transferRateRequest != null) {
+ userDataProp.setProperty(thisPrefix + ATTR_MAX_UPLOAD_RATE,
+ transferRateRequest.getMaxUploadRate());
+ userDataProp.setProperty(thisPrefix + ATTR_MAX_DOWNLOAD_RATE,
+ transferRateRequest.getMaxDownloadRate());
+ } else {
+ userDataProp.remove(thisPrefix + ATTR_MAX_UPLOAD_RATE);
+ userDataProp.remove(thisPrefix + ATTR_MAX_DOWNLOAD_RATE);
+ }
+
+ // request that always will succeed
+ ConcurrentLoginRequest concurrentLoginRequest = new ConcurrentLoginRequest(
+ 0, 0);
+ concurrentLoginRequest = (ConcurrentLoginRequest) usr
+ .authorize(concurrentLoginRequest);
+
+ if (concurrentLoginRequest != null) {
+ userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_NUMBER,
+ concurrentLoginRequest.getMaxConcurrentLogins());
+ userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_PER_IP,
+ concurrentLoginRequest.getMaxConcurrentLoginsPerIP());
+ } else {
+ userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_NUMBER);
+ userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_PER_IP);
+ }
+
+ saveUserData();
+ }
+
+ /**
+ * @throws FtpException
+ */
+ private void saveUserData() throws FtpException {
+ File dir = userDataFile.getAbsoluteFile().getParentFile();
+ if (dir != null && !dir.exists() && !dir.mkdirs()) {
+ String dirName = dir.getAbsolutePath();
+ throw new FtpServerConfigurationException(
+ "Cannot create directory for user data file : " + dirName);
+ }
+
+ // save user data
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(userDataFile);
+ userDataProp.store(fos, "Generated file - don't edit (please)");
+ } catch (IOException ex) {
+ LOG.error("Failed saving user data", ex);
+ throw new FtpException("Failed saving user data", ex);
+ } finally {
+ IoUtils.close(fos);
+ }
+ }
+
+
+ public synchronized void list() throws FtpException {
+ lazyInit();
+
+ Map dataMap = new HashMap();
+ Enumeration<String> propNames = (Enumeration<String>) userDataProp.propertyNames();
+ ArrayList<String> a = Collections.list(propNames);
+ a.remove("i18nMap");//去除i18nMap
+ for(String attrName : a){
+// dataMap.put(attrName, propNames.);
+ }
+
+ }
+
+ /**
+ * Delete an user. Removes all this user entries from the properties. After
+ * removing the corresponding from the properties, save the data.
+ */
+ public synchronized void delete(String usrName) throws FtpException {
+ lazyInit();
+
+ // remove entries from properties
+ String thisPrefix = PREFIX + usrName + '.';
+ Enumeration<?> propNames = userDataProp.propertyNames();
+ ArrayList<String> remKeys = new ArrayList<String>();
+ while (propNames.hasMoreElements()) {
+ String thisKey = propNames.nextElement().toString();
+ if (thisKey.startsWith(thisPrefix)) {
+ remKeys.add(thisKey);
+ }
+ }
+ Iterator<String> remKeysIt = remKeys.iterator();
+ while (remKeysIt.hasNext()) {
+ userDataProp.remove(remKeysIt.next());
+ }
+
+ saveUserData();
+ }
+
+ /**
+ * Get user password. Returns the encrypted value.
+ * <p/>
+ * <pre>
+ * If the password value is not null
+ * password = new password
+ * else
+ * if user does exist
+ * password = old password
+ * else
+ * password = &quot;&quot;
+ * </pre>
+ */
+ private String getPassword(User usr) {
+ String name = usr.getName();
+ String password = usr.getPassword();
+
+ if (password != null) {
+ password = passwordEncryptor.encrypt(password);
+ } else {
+ String blankPassword = passwordEncryptor.encrypt("");
+
+ if (doesExist(name)) {
+ String key = PREFIX + name + '.' + ATTR_PASSWORD;
+ password = userDataProp.getProperty(key, blankPassword);
+ } else {
+ password = blankPassword;
+ }
+ }
+ return password;
+ }
+
+ /**
+ * Get all user names.
+ */
+ public synchronized String[] getAllUserNames() {
+ lazyInit();
+
+ // get all user names
+ String suffix = '.' + ATTR_HOME;
+ ArrayList<String> ulst = new ArrayList<String>();
+ Enumeration<?> allKeys = userDataProp.propertyNames();
+ int prefixlen = PREFIX.length();
+ int suffixlen = suffix.length();
+ while (allKeys.hasMoreElements()) {
+ String key = (String) allKeys.nextElement();
+ if (key.endsWith(suffix)) {
+ String name = key.substring(prefixlen);
+ int endIndex = name.length() - suffixlen;
+ name = name.substring(0, endIndex);
+ ulst.add(name);
+ }
+ }
+
+ Collections.sort(ulst);
+ return ulst.toArray(new String[0]);
+ }
+
+ private ArrayList<String> parseGroups(String groupsLine) {
+ String groupsArray[] = groupsLine.split(",");
+ return new ArrayList(Arrays.asList(groupsArray));
+ }
+
+ public static synchronized boolean getUserRenamePush(String userName) {
+ return userDataProp.getBoolean(PREFIX + userName + ".rename.push", false);
+ }
+
+ /**
+ * Load user data.
+ */
+ public synchronized User getUserByName(String userName) {
+ lazyInit();
+
+ if (!doesExist(userName)) {
+ return null;
+ }
+
+ String baseKey = PREFIX + userName + '.';
+ HdfsUser user = new HdfsUser();
+ user.setName(userName);
+ user.setEnabled(userDataProp.getBoolean(baseKey + ATTR_ENABLE, true));
+ user.setHomeDirectory(userDataProp
+ .getProperty(baseKey + ATTR_HOME, "/"));
+
+// user.setGroups(parseGroups(userDataProp
+// .getProperty(baseKey + "groups")));
+
+ List<Authority> authorities = new ArrayList<Authority>();
+
+ if (userDataProp.getBoolean(baseKey + ATTR_WRITE_PERM, false)) {
+ authorities.add(new WritePermission());
+ }
+
+ int maxLogin = userDataProp.getInteger(baseKey + ATTR_MAX_LOGIN_NUMBER,
+ 0);
+ int maxLoginPerIP = userDataProp.getInteger(baseKey
+ + ATTR_MAX_LOGIN_PER_IP, 0);
+
+ authorities.add(new ConcurrentLoginPermission(maxLogin, maxLoginPerIP));
+
+ int uploadRate = userDataProp.getInteger(
+ baseKey + ATTR_MAX_UPLOAD_RATE, 0);
+ int downloadRate = userDataProp.getInteger(baseKey
+ + ATTR_MAX_DOWNLOAD_RATE, 0);
+
+ authorities.add(new TransferRatePermission(downloadRate, uploadRate));
+
+ user.setAuthorities(authorities);
+
+ user.setMaxIdleTime(userDataProp.getInteger(baseKey
+ + ATTR_MAX_IDLE_TIME, 0));
+
+ return user;
+ }
+
+ /**
+ * User existance check
+ */
+ public synchronized boolean doesExist(String name) {
+ lazyInit();
+
+ String key = PREFIX + name + '.' + ATTR_HOME;
+ return userDataProp.containsKey(key);
+ }
+
+ /**
+ * User authenticate method
+ */
+ public synchronized User authenticate(Authentication authentication)
+ throws AuthenticationFailedException {
+ lazyInit();
+
+ if (authentication instanceof UsernamePasswordAuthentication) {
+ UsernamePasswordAuthentication upauth = (UsernamePasswordAuthentication) authentication;
+
+ String user = upauth.getUsername();
+ String password = upauth.getPassword();
+
+ if (user == null) {
+ throw new AuthenticationFailedException("Authentication failed");
+ }
+
+ if (password == null) {
+ password = "";
+ }
+
+ String storedPassword = userDataProp.getProperty(PREFIX + user + '.'
+ + ATTR_PASSWORD);
+
+ if (storedPassword == null) {
+ // user does not exist
+ throw new AuthenticationFailedException("Authentication failed");
+ }
+
+ if (passwordEncryptor.matches(password, storedPassword)) {
+ return getUserByName(user);
+ } else {
+ throw new AuthenticationFailedException("Authentication failed");
+ }
+
+ } else if (authentication instanceof AnonymousAuthentication) {
+ if (doesExist("anonymous")) {
+ return getUserByName("anonymous");
+ } else {
+ throw new AuthenticationFailedException("Authentication failed");
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Authentication not supported by this user manager");
+ }
+ }
+
+ /**
+ * Close the user manager - remove existing entries.
+ */
+ public synchronized void dispose() {
+ if (userDataProp != null) {
+ userDataProp.clear();
+ userDataProp = null;
+ }
+ }
+}
diff --git a/other/java/hdfs-over-ftp/src/main/resources/application.yml b/other/java/hdfs-over-ftp/src/main/resources/application.yml
new file mode 100644
index 000000000..128bab1f9
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/resources/application.yml
@@ -0,0 +1,15 @@
+server:
+ port: 8080
+
+ftp:
+ port: 2222
+ passive-address: localhost
+ passive-ports: 30000-30999
+
+hdfs:
+ uri: seaweedfs://localhost:8888
+
+seaweedFs:
+ enable: true
+ access: direct # direct/filerProxy/publicUrl
+ replication: "000" \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/resources/assembly.xml b/other/java/hdfs-over-ftp/src/main/resources/assembly.xml
new file mode 100644
index 000000000..84fef56f8
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/resources/assembly.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+
+ <id>package</id>
+ <formats>
+ <!-- 指定打包格式,支持的打包格式有zip、tar、tar.gz (or tgz)、tar.bz2 (or tbz2)、jar、dir、war,可以同时指定多个打包格式 -->
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/resources</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>application.yml</include>
+ <include>logback-spring.xml</include>
+ <include>users.properties</include>
+ <include>kafka-producer.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>false</useProjectArtifact>
+ <outputDirectory>lib</outputDirectory>
+ <scope>runtime</scope>
+ <unpack>false</unpack>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml b/other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml
new file mode 100644
index 000000000..96b4c1d71
--- /dev/null
+++ b/other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<configuration>
+ <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
+ <property name="LOG_HOME" value="${user.dir}/logs/" />
+
+ <!-- 控制台输出 -->
+ <appender name="Stdout" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- 日志输出编码 -->
+ <layout class="ch.qos.logback.classic.PatternLayout">
+ <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
+ <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+ </pattern>
+ </layout>
+ </appender>
+
+ <!-- 按照每天生成日志文件 -->
+ <appender name="RollingFile"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <File>${LOG_HOME}/fileLog.log</File>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_HOME}/fileLog.log.%d.%i</fileNamePattern>
+ <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>100 MB</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+ <encoder>
+ <pattern>
+ %d %p (%file:%line\)- %m%n
+ </pattern>
+ <charset>UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <!-- 日志输出级别 -->
+ <root level="info">
+ <appender-ref ref="Stdout" />
+ <appender-ref ref="RollingFile" />
+ </root>
+
+</configuration> \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/users.properties b/other/java/hdfs-over-ftp/users.properties
new file mode 100644
index 000000000..aeeab8e35
--- /dev/null
+++ b/other/java/hdfs-over-ftp/users.properties
@@ -0,0 +1,12 @@
+#Generated file - don't edit (please)
+#Thu Mar 11 19:11:12 CST 2021
+ftpserver.user.test.idletime=0
+ftpserver.user.test.maxloginperip=0
+ftpserver.user.test.userpassword=44664D4D827C740293D2AA244FB60445
+ftpserver.user.test.enableflag=true
+ftpserver.user.test.maxloginnumber=0
+ftpserver.user.test.rename.push=true
+ftpserver.user.test.homedirectory=/buckets/test/
+ftpserver.user.test.downloadrate=0
+ftpserver.user.test.writepermission=true
+ftpserver.user.test.uploadrate=0
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index d818bc878..503e5fbdf 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -15,8 +15,8 @@
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -120,6 +120,180 @@
</plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>xmlenc</artifactId>
+ <groupId>xmlenc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-sslengine</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jets3t</artifactId>
+ <groupId>net.java.dev.jets3t</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration</artifactId>
+ <groupId>commons-configuration</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-core-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +301,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index b8c8cb891..6eeba912e 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
@@ -31,8 +31,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -147,6 +147,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
@@ -157,6 +158,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
new file mode 100644
index 000000000..3d0b68a52
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
@@ -0,0 +1,25 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
+
+ public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
+ super(in, size);
+ if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
+ throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (this.in instanceof ByteBufferReadable) {
+ return ((ByteBufferReadable)this.in).read(buf);
+ } else {
+ throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ }
+ }
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java
deleted file mode 100644
index 926d0b83b..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBuffer {
-
- private SeaweedInputStream stream;
- private long offset; // offset within the file for the buffer
- private int length; // actual length, set after the buffer is filles
- private int requestedLength; // requested length of the read
- private byte[] buffer; // the buffer itself
- private int bufferindex = -1; // index in the buffers array in Buffer manager
- private ReadBufferStatus status; // status of the buffer
- private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
- // waiting on this buffer gets unblocked
-
- // fields to help with eviction logic
- private long timeStamp = 0; // tick at which buffer became available to read
- private boolean isFirstByteConsumed = false;
- private boolean isLastByteConsumed = false;
- private boolean isAnyByteConsumed = false;
-
- public SeaweedInputStream getStream() {
- return stream;
- }
-
- public void setStream(SeaweedInputStream stream) {
- this.stream = stream;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public int getLength() {
- return length;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public int getRequestedLength() {
- return requestedLength;
- }
-
- public void setRequestedLength(int requestedLength) {
- this.requestedLength = requestedLength;
- }
-
- public byte[] getBuffer() {
- return buffer;
- }
-
- public void setBuffer(byte[] buffer) {
- this.buffer = buffer;
- }
-
- public int getBufferindex() {
- return bufferindex;
- }
-
- public void setBufferindex(int bufferindex) {
- this.bufferindex = bufferindex;
- }
-
- public ReadBufferStatus getStatus() {
- return status;
- }
-
- public void setStatus(ReadBufferStatus status) {
- this.status = status;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- public void setTimeStamp(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- public boolean isFirstByteConsumed() {
- return isFirstByteConsumed;
- }
-
- public void setFirstByteConsumed(boolean isFirstByteConsumed) {
- this.isFirstByteConsumed = isFirstByteConsumed;
- }
-
- public boolean isLastByteConsumed() {
- return isLastByteConsumed;
- }
-
- public void setLastByteConsumed(boolean isLastByteConsumed) {
- this.isLastByteConsumed = isLastByteConsumed;
- }
-
- public boolean isAnyByteConsumed() {
- return isAnyByteConsumed;
- }
-
- public void setAnyByteConsumed(boolean isAnyByteConsumed) {
- this.isAnyByteConsumed = isAnyByteConsumed;
- }
-
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java
deleted file mode 100644
index 5b1e21529..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package seaweed.hdfs;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * The Read Buffer Manager for Rest AbfsClient.
- */
-final class ReadBufferManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
-
- private static final int NUM_BUFFERS = 16;
- private static final int BLOCK_SIZE = 4 * 1024 * 1024;
- private static final int NUM_THREADS = 8;
- private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
-
- private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
-
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
- private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
-
- static {
- BUFFER_MANAGER = new ReadBufferManager();
- BUFFER_MANAGER.init();
- }
-
- static ReadBufferManager getBufferManager() {
- return BUFFER_MANAGER;
- }
-
- private void init() {
- buffers = new byte[NUM_BUFFERS][];
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
- freeList.add(i);
- }
- for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
- t.setDaemon(true);
- threads[i] = t;
- t.setName("SeaweedFS-prefetch-" + i);
- t.start();
- }
- ReadBufferWorker.UNLEASH_WORKERS.countDown();
- }
-
- // hide instance constructor
- private ReadBufferManager() {
- }
-
-
- /*
- *
- * SeaweedInputStream-facing methods
- *
- */
-
-
- /**
- * {@link SeaweedInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link SeaweedInputStream} for which to do the read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
- */
- void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
- stream.getPath(), requestedOffset, requestedLength);
- }
- ReadBuffer buffer;
- synchronized (this) {
- if (isAlreadyQueued(stream, requestedOffset)) {
- return; // already queued, do not queue again
- }
- if (freeList.isEmpty() && !tryEvict()) {
- return; // no buffers available, cannot queue anything
- }
-
- buffer = new ReadBuffer();
- buffer.setStream(stream);
- buffer.setOffset(requestedOffset);
- buffer.setLength(0);
- buffer.setRequestedLength(requestedLength);
- buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
- buffer.setLatch(new CountDownLatch(1));
-
- Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
-
- buffer.setBuffer(buffers[bufferIndex]);
- buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
- notifyAll();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
- stream.getPath(), requestedOffset, buffer.getBufferindex());
- }
- }
-
-
- /**
- * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
- * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
- * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
- * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
- *
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
- * @return the number of bytes read
- */
- int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
- // not synchronized, so have to be careful with locking
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file {} position {} thread {}",
- stream.getPath(), position, Thread.currentThread().getName());
- }
-
- waitForProcess(stream, position);
-
- int bytesRead = 0;
- synchronized (this) {
- bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
- }
- if (bytesRead > 0) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for {} position {} length {}",
- stream.getPath(), position, bytesRead);
- }
- return bytesRead;
- }
-
- // otherwise, just say we got nothing - calling thread can do its own read
- return 0;
- }
-
- /*
- *
- * Internal methods
- *
- */
-
- private void waitForProcess(final SeaweedInputStream stream, final long position) {
- ReadBuffer readBuf;
- synchronized (this) {
- clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
- }
- if (readBuf != null) { // if in in-progress queue, then block for it
- try {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
- stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
- }
- readBuf.getLatch().await(); // blocking wait on the caller stream's thread
- // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
- // is done processing it (in doneReading). There, the latch is set after removing the buffer from
- // inProgressList. So this latch is safe to be outside the synchronized block.
- // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
- // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
- // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file {} buffer idx {} length {}",
- stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
- }
- }
- }
-
- /**
- * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
- * The objective is to find just one buffer - there is no advantage to evicting more than one.
- *
- * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
- */
- private synchronized boolean tryEvict() {
- ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
- return false; // there are no evict-able buffers
- }
-
- // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isAnyByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
-
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try any old nodes that have not been consumed
- long earliestBirthday = Long.MAX_VALUE;
- for (ReadBuffer buf : completedReadList) {
- if (buf.getTimeStamp() < earliestBirthday) {
- nodeToEvict = buf;
- earliestBirthday = buf.getTimeStamp();
- }
- }
- if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
- return evict(nodeToEvict);
- }
-
- // nothing can be evicted
- return false;
- }
-
- private boolean evict(final ReadBuffer buf) {
- freeList.push(buf.getBufferindex());
- completedReadList.remove(buf);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
- buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
- }
- return true;
- }
-
- private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
- // returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
- }
-
- private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- return (getFromList(list, stream, requestedOffset) != null);
- }
-
- private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- for (ReadBuffer buffer : list) {
- if (buffer.getStream() == stream) {
- if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
- && requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getLength()) {
- return buffer;
- } else if (requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
- return buffer;
- }
- }
- }
- return null;
- }
-
- private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
- if (buffer != null) {
- readAheadQueue.remove(buffer);
- notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
- }
- }
-
- private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
- final byte[] buffer) {
- ReadBuffer buf = getFromList(completedReadList, stream, position);
- if (buf == null || position >= buf.getOffset() + buf.getLength()) {
- return 0;
- }
- int cursor = (int) (position - buf.getOffset());
- int availableLengthInBuffer = buf.getLength() - cursor;
- int lengthToCopy = Math.min(length, availableLengthInBuffer);
- System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
- if (cursor == 0) {
- buf.setFirstByteConsumed(true);
- }
- if (cursor + lengthToCopy == buf.getLength()) {
- buf.setLastByteConsumed(true);
- }
- buf.setAnyByteConsumed(true);
- return lengthToCopy;
- }
-
- /*
- *
- * ReadBufferWorker-thread-facing methods
- *
- */
-
- /**
- * ReadBufferWorker thread calls this to get the next buffer that it should work on.
- *
- * @return {@link ReadBuffer}
- * @throws InterruptedException if thread is interrupted
- */
- ReadBuffer getNextBlockToRead() throws InterruptedException {
- ReadBuffer buffer = null;
- synchronized (this) {
- //buffer = readAheadQueue.take(); // blocking method
- while (readAheadQueue.size() == 0) {
- wait();
- }
- buffer = readAheadQueue.remove();
- notifyAll();
- if (buffer == null) {
- return null; // should never happen
- }
- buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
- inProgressList.add(buffer);
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
- buffer.getStream().getPath(), buffer.getOffset());
- }
- return buffer;
- }
-
- /**
- * ReadBufferWorker thread calls this method to post completion.
- *
- * @param buffer the buffer whose read was completed
- * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
- * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
- */
- void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
- }
- synchronized (this) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setTimeStamp(currentTimeMillis());
- buffer.setLength(bytesActuallyRead);
- completedReadList.add(buffer);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
- }
- }
- //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
- buffer.getLatch().countDown(); // wake up waiting threads (if any)
- }
-
- /**
- * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
- * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
- * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
- * Note: it is not monotonic across Sockets, and even within a CPU, its only the
- * more recent parts which share a clock across all cores.
- *
- * @return current time in milliseconds
- */
- private long currentTimeMillis() {
- return System.nanoTime() / 1000 / 1000;
- }
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java
deleted file mode 100644
index 6ffbc4644..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBufferWorker implements Runnable {
-
- protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
- private int id;
-
- ReadBufferWorker(final int id) {
- this.id = id;
- }
-
- /**
- * return the ID of ReadBufferWorker.
- */
- public int getId() {
- return this.id;
- }
-
- /**
- * Waits until a buffer becomes available in ReadAheadQueue.
- * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
- * Rinse and repeat. Forever.
- */
- public void run() {
- try {
- UNLEASH_WORKERS.await();
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
- ReadBuffer buffer;
- while (true) {
- try {
- buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return;
- }
- if (buffer != null) {
- try {
- // do the actual read, from the file.
- int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
- bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
- } catch (Exception ex) {
- bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
- }
- }
- }
- }
-}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
index d63674977..e021401aa 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
@@ -18,12 +18,18 @@
package seaweed.hdfs;
-/**
- * The ReadBufferStatus for Rest AbfsClient
- */
-public enum ReadBufferStatus {
- NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
- READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
- AVAILABLE, // data is available in buffer. It should be in completedList
- READ_FAILED // read completed, but failed.
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class SeaweedAbstractFileSystem extends DelegateToFileSystem {
+
+ SeaweedAbstractFileSystem(final URI uri, final Configuration conf)
+ throws IOException, URISyntaxException {
+ super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false);
+ }
+
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index d471d8440..25395db7a 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -5,31 +5,31 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+public class SeaweedFileSystem extends FileSystem {
-public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
-
- public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
+ public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
+ public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
+ public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
+ public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access";
+ public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -60,16 +60,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
- seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
+ seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, conf);
}
@Override
+ public void close() throws IOException {
+ super.close();
+ this.seaweedFileSystemStore.close();
+ }
+
+ @Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
@@ -77,8 +81,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
try {
- InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(inputStream);
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
+ return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
@@ -94,8 +99,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
try {
- String replicaPlacement = String.format("%03d", replication - 1);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
+ String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1));
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@@ -105,8 +111,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/**
* {@inheritDoc}
+ *
* @throws FileNotFoundException if the parent directory is not present -or
- * is not a directory.
+ * is not a directory.
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
@@ -123,9 +130,10 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize,
- replication, blockSize, progress);
+ replication, seaweedBufferSize, progress);
}
@Override
@@ -135,7 +143,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
try {
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
@@ -144,7 +153,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
- public boolean rename(Path src, Path dst) {
+ public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("rename path: {} => {}", src, dst);
@@ -155,12 +164,13 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
if (src.equals(dst)) {
return true;
}
- FileStatus dstFileStatus = getFileStatus(dst);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst);
- String sourceFileName = src.getName();
Path adjustedDst = dst;
- if (dstFileStatus != null) {
+ if (entry != null) {
+ FileStatus dstFileStatus = getFileStatus(dst);
+ String sourceFileName = src.getName();
if (!dstFileStatus.isDirectory()) {
return false;
}
@@ -175,18 +185,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
- public boolean delete(Path path, boolean recursive) {
+ public boolean delete(Path path, boolean recursive) throws IOException {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
return true;
}
+ FileStatus fileStatus = getFileStatus(path);
+
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
}
@@ -222,9 +234,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
@@ -233,6 +245,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
+ FileStatus fileStatus = getFileStatus(path);
+
if (fileStatus.isDirectory()) {
return true;
} else {
@@ -241,7 +255,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
- public FileStatus getFileStatus(Path path) {
+ public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("getFileStatus path: {}", path);
@@ -335,9 +349,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
+ final boolean createParent) throws
IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 774c090e8..f4e8c9349 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -1,5 +1,7 @@
package seaweed.hdfs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -7,30 +9,43 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
+import seaweedfs.client.*;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static seaweed.hdfs.SeaweedFileSystem.*;
+
public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
- private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient;
+ private Configuration conf;
- public SeaweedFileSystemStore(String host, int port) {
+ public SeaweedFileSystemStore(String host, int port, Configuration conf) {
int grpcPort = 10000 + port;
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
- filerClient = new FilerClient(filerGrpcClient);
+ filerClient = new FilerClient(host, grpcPort);
+ this.conf = conf;
+ String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
+ if (volumeServerAccessMode.equals("publicUrl")) {
+ filerClient.setAccessVolumeServerByPublicUrl();
+ } else if (volumeServerAccessMode.equals("filerProxy")) {
+ filerClient.setAccessVolumeServerByFilerProxy();
+ }
+
+ }
+
+ public void close() {
+ try {
+ this.filerClient.shutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
public static String getParentDirectory(Path path) {
@@ -61,9 +76,19 @@ public class SeaweedFileSystemStore {
);
}
- public FileStatus[] listEntries(final Path path) {
+ public FileStatus[] listEntries(final Path path) throws IOException {
LOG.debug("listEntries path: {}", path);
+ FileStatus pathStatus = getFileStatus(path);
+
+ if (pathStatus == null) {
+ return new FileStatus[0];
+ }
+
+ if (!pathStatus.isDirectory()) {
+ return new FileStatus[]{pathStatus};
+ }
+
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
@@ -74,14 +99,16 @@ public class SeaweedFileSystemStore {
fileStatuses.add(fileStatus);
}
+ LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size());
return fileStatuses.toArray(new FileStatus[0]);
+
}
- public FileStatus getFileStatus(final Path path) {
+ public FileStatus getFileStatus(final Path path) throws IOException {
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
- return null;
+ throw new FileNotFoundException("File does not exist: " + path);
}
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
@@ -111,10 +138,10 @@ public class SeaweedFileSystemStore {
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
- long length = SeaweedRead.totalSize(entry.getChunksList());
+ long length = SeaweedRead.fileSize(entry);
boolean isDir = entry.getIsDirectory();
int block_replication = 1;
- int blocksize = 512;
+ int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
long modification_time = attributes.getMtime() * 1000; // milliseconds
long access_time = 0;
FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
@@ -124,7 +151,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
- private FilerProto.Entry lookupEntry(Path path) {
+ public FilerProto.Entry lookupEntry(Path path) {
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
@@ -170,9 +197,10 @@ public class SeaweedFileSystemStore {
if (existingEntry != null) {
entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry);
+ entry.clearContent();
entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
- writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
+ writePosition = SeaweedRead.fileSize(existingEntry);
replication = existingEntry.getAttributes().getReplication();
}
}
@@ -189,30 +217,27 @@ public class SeaweedFileSystemStore {
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
+ SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
}
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
+ return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
}
- public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
- int bufferSize) throws IOException {
+ public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException {
- LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
+ LOG.debug("openFileForRead path:{}", path);
- int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
throw new FileNotFoundException("read non-exist file " + path);
}
- return new SeaweedInputStream(filerGrpcClient,
+ return new SeaweedHadoopInputStream(filerClient,
statistics,
path.toUri().getPath(),
- entry,
- bufferSize,
- readAheadQueueDepth);
+ entry);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
new file mode 100644
index 000000000..f26eae597
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -0,0 +1,150 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+
+ private final SeaweedInputStream seaweedInputStream;
+ private final Statistics statistics;
+
+ public SeaweedHadoopInputStream(
+ final FilerClient filerClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
+ this.statistics = statistics;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return seaweedInputStream.read();
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return seaweedInputStream.read(b, off, len);
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int bytesRead = seaweedInputStream.read(buf);
+
+ if (bytesRead > 0) {
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ seaweedInputStream.seek(n);
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ return seaweedInputStream.skip(n);
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ return seaweedInputStream.available();
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ return seaweedInputStream.length();
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return seaweedInputStream.getPos();
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ seaweedInputStream.close();
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
new file mode 100644
index 000000000..da5b56bbc
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -0,0 +1,16 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedOutputStream;
+
+public class SeaweedHadoopOutputStream extends SeaweedOutputStream {
+
+ public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ super(filerClient, path, entry, position, bufferSize, replication);
+ }
+
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
deleted file mode 100644
index 90c14c772..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ /dev/null
@@ -1,371 +0,0 @@
-package seaweed.hdfs;
-
-// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.List;
-
-public class SeaweedInputStream extends FSInputStream {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Statistics statistics;
- private final String path;
- private final FilerProto.Entry entry;
- private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
- private final long contentLength;
- private final int bufferSize; // default buffer size
- private final int readAheadQueueDepth; // initialized in constructor
- private final boolean readAheadEnabled; // whether enable readAhead;
-
- private byte[] buffer = null; // will be initialized on first use
-
- private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
- private long fCursorAfterLastRead = -1;
- private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
- private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
- // of valid bytes in buffer)
- private boolean closed = false;
-
- public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry,
- final int bufferSize,
- final int readAheadQueueDepth) {
- this.filerGrpcClient = filerGrpcClient;
- this.statistics = statistics;
- this.path = path;
- this.entry = entry;
- this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
- this.bufferSize = bufferSize;
- this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
- this.readAheadEnabled = true;
-
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
-
- LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
-
- }
-
- public String getPath() {
- return path;
- }
-
- @Override
- public int read() throws IOException {
- byte[] b = new byte[1];
- int numberOfBytesRead = read(b, 0, 1);
- if (numberOfBytesRead < 0) {
- return -1;
- } else {
- return (b[0] & 0xFF);
- }
- }
-
- @Override
- public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
- int currentOff = off;
- int currentLen = len;
- int lastReadBytes;
- int totalReadBytes = 0;
- do {
- lastReadBytes = readOneBlock(b, currentOff, currentLen);
- if (lastReadBytes > 0) {
- currentOff += lastReadBytes;
- currentLen -= lastReadBytes;
- totalReadBytes += lastReadBytes;
- }
- if (currentLen <= 0 || currentLen > b.length - currentOff) {
- break;
- }
- } while (lastReadBytes > 0);
- return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
- }
-
- private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
-
- Preconditions.checkNotNull(b);
-
- if (len == 0) {
- return 0;
- }
-
- if (this.available() == 0) {
- return -1;
- }
-
- if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- //If buffer is empty, then fill the buffer.
- if (bCursor == limit) {
- //If EOF, then return -1
- if (fCursor >= contentLength) {
- return -1;
- }
-
- long bytesRead = 0;
- //reset buffer to initial state - i.e., throw away existing data
- bCursor = 0;
- limit = 0;
- if (buffer == null) {
- buffer = new byte[bufferSize];
- }
-
- // Enable readAhead when reading sequentially
- if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
- bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
- } else {
- bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
- }
-
- if (bytesRead == -1) {
- return -1;
- }
-
- limit += bytesRead;
- fCursor += bytesRead;
- fCursorAfterLastRead = fCursor;
- }
-
- //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
- //(bytes returned may be less than requested)
- int bytesRemaining = limit - bCursor;
- int bytesToRead = Math.min(len, bytesRemaining);
- System.arraycopy(buffer, bCursor, b, off, bytesToRead);
- bCursor += bytesToRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesToRead);
- }
- return bytesToRead;
- }
-
-
- private int readInternal(final long position, final byte[] b, final int offset, final int length,
- final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
- // try reading from read-ahead
- if (offset != 0) {
- throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
- }
- int receivedBytes;
-
- // queue read-aheads
- int numReadAheads = this.readAheadQueueDepth;
- long nextSize;
- long nextOffset = position;
- while (numReadAheads > 0 && nextOffset < contentLength) {
- nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
- ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
- nextOffset = nextOffset + nextSize;
- numReadAheads--;
- }
-
- // try reading from buffers first
- receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
- if (receivedBytes > 0) {
- return receivedBytes;
- }
-
- // got nothing from read-ahead, do our own read now
- receivedBytes = readRemote(position, b, offset, length);
- return receivedBytes;
- } else {
- return readRemote(position, b, offset, length);
- }
- }
-
- int readRemote(long position, byte[] b, int offset, int length) throws IOException {
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
- if (b == null) {
- throw new IllegalArgumentException("null byte array passed in to read() method");
- }
- if (offset >= b.length) {
- throw new IllegalArgumentException("offset greater than length of array");
- }
- if (length < 0) {
- throw new IllegalArgumentException("requested read length is less than zero");
- }
- if (length > (b.length - offset)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
- }
-
- long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
- if (bytesRead > Integer.MAX_VALUE) {
- throw new IOException("Unexpected Content-Length");
- }
- return (int) bytesRead;
- }
-
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
- public synchronized void seek(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- if (n < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
- }
- if (n > contentLength) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
-
- if (n >= fCursor - limit && n <= fCursor) { // within buffer
- bCursor = (int) (n - (fCursor - limit));
- return;
- }
-
- // next read will read from here
- fCursor = n;
-
- //invalidate buffer
- limit = 0;
- bCursor = 0;
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- long currentPos = getPos();
- if (currentPos == contentLength) {
- if (n > 0) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
- }
- long newPos = currentPos + n;
- if (newPos < 0) {
- newPos = 0;
- n = newPos - currentPos;
- }
- if (newPos > contentLength) {
- newPos = contentLength;
- n = newPos - currentPos;
- }
- seek(newPos);
- return n;
- }
-
- /**
- * Return the size of the remaining available bytes
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
- * otherwise, return {@link Integer#MAX_VALUE}.
- * <p>
- * This is to match the behavior of DFSInputStream.available(),
- * which some clients may rely on (HBase write-ahead log reading in
- * particular).
- */
- @Override
- public synchronized int available() throws IOException {
- if (closed) {
- throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
- }
- final long remaining = this.contentLength - this.getPos();
- return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
- }
-
- /**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
- * they wont be reflected in the returned length.
- *
- * @return length of the file.
- * @throws IOException if the stream is closed
- */
- public long length() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return contentLength;
- }
-
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public synchronized long getPos() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return fCursor - limit + bCursor;
- }
-
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
- @Override
- public synchronized void close() throws IOException {
- closed = true;
- buffer = null; // de-reference the buffer so it can be GC'ed sooner
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-}
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index ca53ffd22..590d725d0 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -15,8 +15,8 @@
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -120,6 +120,188 @@
</plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>javax.servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-server</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-servlet</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-webapp</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-servlet</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-beanutils</artifactId>
+ <groupId>commons-beanutils</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration2</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>re2j</artifactId>
+ <groupId>com.google.re2j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>kerb-simplekdc</artifactId>
+ <groupId>org.apache.kerby</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-databind</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +309,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index f5207213c..2b11035f5 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.2.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
@@ -31,8 +31,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -147,6 +147,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
@@ -157,6 +158,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
new file mode 100644
index 000000000..3d0b68a52
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
@@ -0,0 +1,25 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
+
+ public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
+ super(in, size);
+ if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
+ throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (this.in instanceof ByteBufferReadable) {
+ return ((ByteBufferReadable)this.in).read(buf);
+ } else {
+ throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ }
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java
deleted file mode 100644
index 926d0b83b..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBuffer {
-
- private SeaweedInputStream stream;
- private long offset; // offset within the file for the buffer
- private int length; // actual length, set after the buffer is filles
- private int requestedLength; // requested length of the read
- private byte[] buffer; // the buffer itself
- private int bufferindex = -1; // index in the buffers array in Buffer manager
- private ReadBufferStatus status; // status of the buffer
- private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
- // waiting on this buffer gets unblocked
-
- // fields to help with eviction logic
- private long timeStamp = 0; // tick at which buffer became available to read
- private boolean isFirstByteConsumed = false;
- private boolean isLastByteConsumed = false;
- private boolean isAnyByteConsumed = false;
-
- public SeaweedInputStream getStream() {
- return stream;
- }
-
- public void setStream(SeaweedInputStream stream) {
- this.stream = stream;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public int getLength() {
- return length;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public int getRequestedLength() {
- return requestedLength;
- }
-
- public void setRequestedLength(int requestedLength) {
- this.requestedLength = requestedLength;
- }
-
- public byte[] getBuffer() {
- return buffer;
- }
-
- public void setBuffer(byte[] buffer) {
- this.buffer = buffer;
- }
-
- public int getBufferindex() {
- return bufferindex;
- }
-
- public void setBufferindex(int bufferindex) {
- this.bufferindex = bufferindex;
- }
-
- public ReadBufferStatus getStatus() {
- return status;
- }
-
- public void setStatus(ReadBufferStatus status) {
- this.status = status;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- public void setTimeStamp(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- public boolean isFirstByteConsumed() {
- return isFirstByteConsumed;
- }
-
- public void setFirstByteConsumed(boolean isFirstByteConsumed) {
- this.isFirstByteConsumed = isFirstByteConsumed;
- }
-
- public boolean isLastByteConsumed() {
- return isLastByteConsumed;
- }
-
- public void setLastByteConsumed(boolean isLastByteConsumed) {
- this.isLastByteConsumed = isLastByteConsumed;
- }
-
- public boolean isAnyByteConsumed() {
- return isAnyByteConsumed;
- }
-
- public void setAnyByteConsumed(boolean isAnyByteConsumed) {
- this.isAnyByteConsumed = isAnyByteConsumed;
- }
-
-}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java
deleted file mode 100644
index 5b1e21529..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package seaweed.hdfs;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * The Read Buffer Manager for Rest AbfsClient.
- */
-final class ReadBufferManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
-
- private static final int NUM_BUFFERS = 16;
- private static final int BLOCK_SIZE = 4 * 1024 * 1024;
- private static final int NUM_THREADS = 8;
- private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
-
- private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
-
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
- private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
-
- static {
- BUFFER_MANAGER = new ReadBufferManager();
- BUFFER_MANAGER.init();
- }
-
- static ReadBufferManager getBufferManager() {
- return BUFFER_MANAGER;
- }
-
- private void init() {
- buffers = new byte[NUM_BUFFERS][];
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
- freeList.add(i);
- }
- for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
- t.setDaemon(true);
- threads[i] = t;
- t.setName("SeaweedFS-prefetch-" + i);
- t.start();
- }
- ReadBufferWorker.UNLEASH_WORKERS.countDown();
- }
-
- // hide instance constructor
- private ReadBufferManager() {
- }
-
-
- /*
- *
- * SeaweedInputStream-facing methods
- *
- */
-
-
- /**
- * {@link SeaweedInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link SeaweedInputStream} for which to do the read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
- */
- void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
- stream.getPath(), requestedOffset, requestedLength);
- }
- ReadBuffer buffer;
- synchronized (this) {
- if (isAlreadyQueued(stream, requestedOffset)) {
- return; // already queued, do not queue again
- }
- if (freeList.isEmpty() && !tryEvict()) {
- return; // no buffers available, cannot queue anything
- }
-
- buffer = new ReadBuffer();
- buffer.setStream(stream);
- buffer.setOffset(requestedOffset);
- buffer.setLength(0);
- buffer.setRequestedLength(requestedLength);
- buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
- buffer.setLatch(new CountDownLatch(1));
-
- Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
-
- buffer.setBuffer(buffers[bufferIndex]);
- buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
- notifyAll();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
- stream.getPath(), requestedOffset, buffer.getBufferindex());
- }
- }
-
-
- /**
- * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
- * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
- * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
- * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
- *
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
- * @return the number of bytes read
- */
- int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
- // not synchronized, so have to be careful with locking
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file {} position {} thread {}",
- stream.getPath(), position, Thread.currentThread().getName());
- }
-
- waitForProcess(stream, position);
-
- int bytesRead = 0;
- synchronized (this) {
- bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
- }
- if (bytesRead > 0) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for {} position {} length {}",
- stream.getPath(), position, bytesRead);
- }
- return bytesRead;
- }
-
- // otherwise, just say we got nothing - calling thread can do its own read
- return 0;
- }
-
- /*
- *
- * Internal methods
- *
- */
-
- private void waitForProcess(final SeaweedInputStream stream, final long position) {
- ReadBuffer readBuf;
- synchronized (this) {
- clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
- }
- if (readBuf != null) { // if in in-progress queue, then block for it
- try {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
- stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
- }
- readBuf.getLatch().await(); // blocking wait on the caller stream's thread
- // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
- // is done processing it (in doneReading). There, the latch is set after removing the buffer from
- // inProgressList. So this latch is safe to be outside the synchronized block.
- // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
- // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
- // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file {} buffer idx {} length {}",
- stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
- }
- }
- }
-
- /**
- * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
- * The objective is to find just one buffer - there is no advantage to evicting more than one.
- *
- * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
- */
- private synchronized boolean tryEvict() {
- ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
- return false; // there are no evict-able buffers
- }
-
- // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isAnyByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
-
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try any old nodes that have not been consumed
- long earliestBirthday = Long.MAX_VALUE;
- for (ReadBuffer buf : completedReadList) {
- if (buf.getTimeStamp() < earliestBirthday) {
- nodeToEvict = buf;
- earliestBirthday = buf.getTimeStamp();
- }
- }
- if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
- return evict(nodeToEvict);
- }
-
- // nothing can be evicted
- return false;
- }
-
- private boolean evict(final ReadBuffer buf) {
- freeList.push(buf.getBufferindex());
- completedReadList.remove(buf);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
- buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
- }
- return true;
- }
-
- private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
- // returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
- }
-
- private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- return (getFromList(list, stream, requestedOffset) != null);
- }
-
- private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- for (ReadBuffer buffer : list) {
- if (buffer.getStream() == stream) {
- if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
- && requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getLength()) {
- return buffer;
- } else if (requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
- return buffer;
- }
- }
- }
- return null;
- }
-
- private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
- if (buffer != null) {
- readAheadQueue.remove(buffer);
- notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
- }
- }
-
- private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
- final byte[] buffer) {
- ReadBuffer buf = getFromList(completedReadList, stream, position);
- if (buf == null || position >= buf.getOffset() + buf.getLength()) {
- return 0;
- }
- int cursor = (int) (position - buf.getOffset());
- int availableLengthInBuffer = buf.getLength() - cursor;
- int lengthToCopy = Math.min(length, availableLengthInBuffer);
- System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
- if (cursor == 0) {
- buf.setFirstByteConsumed(true);
- }
- if (cursor + lengthToCopy == buf.getLength()) {
- buf.setLastByteConsumed(true);
- }
- buf.setAnyByteConsumed(true);
- return lengthToCopy;
- }
-
- /*
- *
- * ReadBufferWorker-thread-facing methods
- *
- */
-
- /**
- * ReadBufferWorker thread calls this to get the next buffer that it should work on.
- *
- * @return {@link ReadBuffer}
- * @throws InterruptedException if thread is interrupted
- */
- ReadBuffer getNextBlockToRead() throws InterruptedException {
- ReadBuffer buffer = null;
- synchronized (this) {
- //buffer = readAheadQueue.take(); // blocking method
- while (readAheadQueue.size() == 0) {
- wait();
- }
- buffer = readAheadQueue.remove();
- notifyAll();
- if (buffer == null) {
- return null; // should never happen
- }
- buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
- inProgressList.add(buffer);
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
- buffer.getStream().getPath(), buffer.getOffset());
- }
- return buffer;
- }
-
- /**
- * ReadBufferWorker thread calls this method to post completion.
- *
- * @param buffer the buffer whose read was completed
- * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
- * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
- */
- void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
- }
- synchronized (this) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setTimeStamp(currentTimeMillis());
- buffer.setLength(bytesActuallyRead);
- completedReadList.add(buffer);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
- }
- }
- //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
- buffer.getLatch().countDown(); // wake up waiting threads (if any)
- }
-
- /**
- * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
- * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
- * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
- * Note: it is not monotonic across Sockets, and even within a CPU, its only the
- * more recent parts which share a clock across all cores.
- *
- * @return current time in milliseconds
- */
- private long currentTimeMillis() {
- return System.nanoTime() / 1000 / 1000;
- }
-}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java
deleted file mode 100644
index 6ffbc4644..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBufferWorker implements Runnable {
-
- protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
- private int id;
-
- ReadBufferWorker(final int id) {
- this.id = id;
- }
-
- /**
- * return the ID of ReadBufferWorker.
- */
- public int getId() {
- return this.id;
- }
-
- /**
- * Waits until a buffer becomes available in ReadAheadQueue.
- * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
- * Rinse and repeat. Forever.
- */
- public void run() {
- try {
- UNLEASH_WORKERS.await();
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
- ReadBuffer buffer;
- while (true) {
- try {
- buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return;
- }
- if (buffer != null) {
- try {
- // do the actual read, from the file.
- int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
- bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
- } catch (Exception ex) {
- bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
- }
- }
- }
- }
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
index d63674977..e021401aa 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
@@ -18,12 +18,18 @@
package seaweed.hdfs;
-/**
- * The ReadBufferStatus for Rest AbfsClient
- */
-public enum ReadBufferStatus {
- NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
- READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
- AVAILABLE, // data is available in buffer. It should be in completedList
- READ_FAILED // read completed, but failed.
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class SeaweedAbstractFileSystem extends DelegateToFileSystem {
+
+ SeaweedAbstractFileSystem(final URI uri, final Configuration conf)
+ throws IOException, URISyntaxException {
+ super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false);
+ }
+
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index c12da8261..25395db7a 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -5,31 +5,31 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-
public class SeaweedFileSystem extends FileSystem {
- public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
+ public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
+ public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
+ public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
+ public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access";
+ public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -60,16 +60,20 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
- seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
+ seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, conf);
}
@Override
+ public void close() throws IOException {
+ super.close();
+ this.seaweedFileSystemStore.close();
+ }
+
+ @Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
@@ -77,8 +81,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
- InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(inputStream);
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
+ return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
@@ -94,8 +99,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
- String replicaPlacement = String.format("%03d", replication - 1);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
+ String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1));
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@@ -105,8 +111,9 @@ public class SeaweedFileSystem extends FileSystem {
/**
* {@inheritDoc}
+ *
* @throws FileNotFoundException if the parent directory is not present -or
- * is not a directory.
+ * is not a directory.
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
@@ -123,9 +130,10 @@ public class SeaweedFileSystem extends FileSystem {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize,
- replication, blockSize, progress);
+ replication, seaweedBufferSize, progress);
}
@Override
@@ -135,7 +143,8 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
@@ -144,7 +153,7 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
- public boolean rename(Path src, Path dst) {
+ public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("rename path: {} => {}", src, dst);
@@ -155,12 +164,13 @@ public class SeaweedFileSystem extends FileSystem {
if (src.equals(dst)) {
return true;
}
- FileStatus dstFileStatus = getFileStatus(dst);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst);
- String sourceFileName = src.getName();
Path adjustedDst = dst;
- if (dstFileStatus != null) {
+ if (entry != null) {
+ FileStatus dstFileStatus = getFileStatus(dst);
+ String sourceFileName = src.getName();
if (!dstFileStatus.isDirectory()) {
return false;
}
@@ -175,18 +185,20 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
- public boolean delete(Path path, boolean recursive) {
+ public boolean delete(Path path, boolean recursive) throws IOException {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
return true;
}
+ FileStatus fileStatus = getFileStatus(path);
+
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
}
@@ -222,9 +234,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
@@ -233,6 +245,8 @@ public class SeaweedFileSystem extends FileSystem {
}
+ FileStatus fileStatus = getFileStatus(path);
+
if (fileStatus.isDirectory()) {
return true;
} else {
@@ -241,7 +255,7 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
- public FileStatus getFileStatus(Path path) {
+ public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("getFileStatus path: {}", path);
@@ -335,9 +349,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
+ final boolean createParent) throws
IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 774c090e8..f4e8c9349 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -1,5 +1,7 @@
package seaweed.hdfs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -7,30 +9,43 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
+import seaweedfs.client.*;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static seaweed.hdfs.SeaweedFileSystem.*;
+
public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
- private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient;
+ private Configuration conf;
- public SeaweedFileSystemStore(String host, int port) {
+ public SeaweedFileSystemStore(String host, int port, Configuration conf) {
int grpcPort = 10000 + port;
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
- filerClient = new FilerClient(filerGrpcClient);
+ filerClient = new FilerClient(host, grpcPort);
+ this.conf = conf;
+ String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
+ if (volumeServerAccessMode.equals("publicUrl")) {
+ filerClient.setAccessVolumeServerByPublicUrl();
+ } else if (volumeServerAccessMode.equals("filerProxy")) {
+ filerClient.setAccessVolumeServerByFilerProxy();
+ }
+
+ }
+
+ public void close() {
+ try {
+ this.filerClient.shutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
public static String getParentDirectory(Path path) {
@@ -61,9 +76,19 @@ public class SeaweedFileSystemStore {
);
}
- public FileStatus[] listEntries(final Path path) {
+ public FileStatus[] listEntries(final Path path) throws IOException {
LOG.debug("listEntries path: {}", path);
+ FileStatus pathStatus = getFileStatus(path);
+
+ if (pathStatus == null) {
+ return new FileStatus[0];
+ }
+
+ if (!pathStatus.isDirectory()) {
+ return new FileStatus[]{pathStatus};
+ }
+
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
@@ -74,14 +99,16 @@ public class SeaweedFileSystemStore {
fileStatuses.add(fileStatus);
}
+ LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size());
return fileStatuses.toArray(new FileStatus[0]);
+
}
- public FileStatus getFileStatus(final Path path) {
+ public FileStatus getFileStatus(final Path path) throws IOException {
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
- return null;
+ throw new FileNotFoundException("File does not exist: " + path);
}
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
@@ -111,10 +138,10 @@ public class SeaweedFileSystemStore {
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
- long length = SeaweedRead.totalSize(entry.getChunksList());
+ long length = SeaweedRead.fileSize(entry);
boolean isDir = entry.getIsDirectory();
int block_replication = 1;
- int blocksize = 512;
+ int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
long modification_time = attributes.getMtime() * 1000; // milliseconds
long access_time = 0;
FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
@@ -124,7 +151,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
- private FilerProto.Entry lookupEntry(Path path) {
+ public FilerProto.Entry lookupEntry(Path path) {
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
@@ -170,9 +197,10 @@ public class SeaweedFileSystemStore {
if (existingEntry != null) {
entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry);
+ entry.clearContent();
entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
- writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
+ writePosition = SeaweedRead.fileSize(existingEntry);
replication = existingEntry.getAttributes().getReplication();
}
}
@@ -189,30 +217,27 @@ public class SeaweedFileSystemStore {
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
+ SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
}
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
+ return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
}
- public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
- int bufferSize) throws IOException {
+ public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException {
- LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
+ LOG.debug("openFileForRead path:{}", path);
- int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
throw new FileNotFoundException("read non-exist file " + path);
}
- return new SeaweedInputStream(filerGrpcClient,
+ return new SeaweedHadoopInputStream(filerClient,
statistics,
path.toUri().getPath(),
- entry,
- bufferSize,
- readAheadQueueDepth);
+ entry);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
new file mode 100644
index 000000000..f26eae597
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -0,0 +1,150 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+
+ private final SeaweedInputStream seaweedInputStream;
+ private final Statistics statistics;
+
+ public SeaweedHadoopInputStream(
+ final FilerClient filerClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
+ this.statistics = statistics;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return seaweedInputStream.read();
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return seaweedInputStream.read(b, off, len);
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int bytesRead = seaweedInputStream.read(buf);
+
+ if (bytesRead > 0) {
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ seaweedInputStream.seek(n);
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ return seaweedInputStream.skip(n);
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ return seaweedInputStream.available();
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ return seaweedInputStream.length();
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return seaweedInputStream.getPos();
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ seaweedInputStream.close();
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
new file mode 100644
index 000000000..1740312fe
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -0,0 +1,64 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedOutputStream;
+
+import java.io.IOException;
+import java.util.Locale;
+
+public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities {
+
+ public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ super(filerClient, path, entry, position, bufferSize, replication);
+ }
+
+ /**
+ * Similar to posix fsync, flush out the data in client's user buffer
+ * all the way to the disk device (but the disk may have it in its cache).
+ *
+ * @throws IOException if error occurs
+ */
+ @Override
+ public void hsync() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Flush out the data in client's user buffer. After the return of
+ * this call, new readers will see the data.
+ *
+ * @throws IOException if any error occurs
+ */
+ @Override
+ public void hflush() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Query the stream for a specific capability.
+ *
+ * @param capability string to query the stream support for.
+ * @return true for hsync and hflush.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (capability.toLowerCase(Locale.ENGLISH)) {
+ case StreamCapabilities.HSYNC:
+ case StreamCapabilities.HFLUSH:
+ return supportFlush;
+ default:
+ return false;
+ }
+ }
+
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
deleted file mode 100644
index 90c14c772..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ /dev/null
@@ -1,371 +0,0 @@
-package seaweed.hdfs;
-
-// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.List;
-
-public class SeaweedInputStream extends FSInputStream {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Statistics statistics;
- private final String path;
- private final FilerProto.Entry entry;
- private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
- private final long contentLength;
- private final int bufferSize; // default buffer size
- private final int readAheadQueueDepth; // initialized in constructor
- private final boolean readAheadEnabled; // whether enable readAhead;
-
- private byte[] buffer = null; // will be initialized on first use
-
- private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
- private long fCursorAfterLastRead = -1;
- private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
- private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
- // of valid bytes in buffer)
- private boolean closed = false;
-
- public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry,
- final int bufferSize,
- final int readAheadQueueDepth) {
- this.filerGrpcClient = filerGrpcClient;
- this.statistics = statistics;
- this.path = path;
- this.entry = entry;
- this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
- this.bufferSize = bufferSize;
- this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
- this.readAheadEnabled = true;
-
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
-
- LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
-
- }
-
- public String getPath() {
- return path;
- }
-
- @Override
- public int read() throws IOException {
- byte[] b = new byte[1];
- int numberOfBytesRead = read(b, 0, 1);
- if (numberOfBytesRead < 0) {
- return -1;
- } else {
- return (b[0] & 0xFF);
- }
- }
-
- @Override
- public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
- int currentOff = off;
- int currentLen = len;
- int lastReadBytes;
- int totalReadBytes = 0;
- do {
- lastReadBytes = readOneBlock(b, currentOff, currentLen);
- if (lastReadBytes > 0) {
- currentOff += lastReadBytes;
- currentLen -= lastReadBytes;
- totalReadBytes += lastReadBytes;
- }
- if (currentLen <= 0 || currentLen > b.length - currentOff) {
- break;
- }
- } while (lastReadBytes > 0);
- return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
- }
-
- private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
-
- Preconditions.checkNotNull(b);
-
- if (len == 0) {
- return 0;
- }
-
- if (this.available() == 0) {
- return -1;
- }
-
- if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- //If buffer is empty, then fill the buffer.
- if (bCursor == limit) {
- //If EOF, then return -1
- if (fCursor >= contentLength) {
- return -1;
- }
-
- long bytesRead = 0;
- //reset buffer to initial state - i.e., throw away existing data
- bCursor = 0;
- limit = 0;
- if (buffer == null) {
- buffer = new byte[bufferSize];
- }
-
- // Enable readAhead when reading sequentially
- if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
- bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
- } else {
- bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
- }
-
- if (bytesRead == -1) {
- return -1;
- }
-
- limit += bytesRead;
- fCursor += bytesRead;
- fCursorAfterLastRead = fCursor;
- }
-
- //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
- //(bytes returned may be less than requested)
- int bytesRemaining = limit - bCursor;
- int bytesToRead = Math.min(len, bytesRemaining);
- System.arraycopy(buffer, bCursor, b, off, bytesToRead);
- bCursor += bytesToRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesToRead);
- }
- return bytesToRead;
- }
-
-
- private int readInternal(final long position, final byte[] b, final int offset, final int length,
- final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
- // try reading from read-ahead
- if (offset != 0) {
- throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
- }
- int receivedBytes;
-
- // queue read-aheads
- int numReadAheads = this.readAheadQueueDepth;
- long nextSize;
- long nextOffset = position;
- while (numReadAheads > 0 && nextOffset < contentLength) {
- nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
- ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
- nextOffset = nextOffset + nextSize;
- numReadAheads--;
- }
-
- // try reading from buffers first
- receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
- if (receivedBytes > 0) {
- return receivedBytes;
- }
-
- // got nothing from read-ahead, do our own read now
- receivedBytes = readRemote(position, b, offset, length);
- return receivedBytes;
- } else {
- return readRemote(position, b, offset, length);
- }
- }
-
- int readRemote(long position, byte[] b, int offset, int length) throws IOException {
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
- if (b == null) {
- throw new IllegalArgumentException("null byte array passed in to read() method");
- }
- if (offset >= b.length) {
- throw new IllegalArgumentException("offset greater than length of array");
- }
- if (length < 0) {
- throw new IllegalArgumentException("requested read length is less than zero");
- }
- if (length > (b.length - offset)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
- }
-
- long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
- if (bytesRead > Integer.MAX_VALUE) {
- throw new IOException("Unexpected Content-Length");
- }
- return (int) bytesRead;
- }
-
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
- public synchronized void seek(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- if (n < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
- }
- if (n > contentLength) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
-
- if (n >= fCursor - limit && n <= fCursor) { // within buffer
- bCursor = (int) (n - (fCursor - limit));
- return;
- }
-
- // next read will read from here
- fCursor = n;
-
- //invalidate buffer
- limit = 0;
- bCursor = 0;
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- long currentPos = getPos();
- if (currentPos == contentLength) {
- if (n > 0) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
- }
- long newPos = currentPos + n;
- if (newPos < 0) {
- newPos = 0;
- n = newPos - currentPos;
- }
- if (newPos > contentLength) {
- newPos = contentLength;
- n = newPos - currentPos;
- }
- seek(newPos);
- return n;
- }
-
- /**
- * Return the size of the remaining available bytes
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
- * otherwise, return {@link Integer#MAX_VALUE}.
- * <p>
- * This is to match the behavior of DFSInputStream.available(),
- * which some clients may rely on (HBase write-ahead log reading in
- * particular).
- */
- @Override
- public synchronized int available() throws IOException {
- if (closed) {
- throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
- }
- final long remaining = this.contentLength - this.getPos();
- return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
- }
-
- /**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
- * they wont be reflected in the returned length.
- *
- * @return length of the file.
- * @throws IOException if the stream is closed
- */
- public long length() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return contentLength;
- }
-
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public synchronized long getPos() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return fCursor - limit + bCursor;
- }
-
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
- @Override
- public synchronized void close() throws IOException {
- closed = true;
- buffer = null; // de-reference the buffer so it can be GC'ed sooner
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
deleted file mode 100644
index 4f307ff96..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ /dev/null
@@ -1,335 +0,0 @@
-package seaweed.hdfs;
-
-// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StreamCapabilities;
-import org.apache.hadoop.fs.Syncable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedWrite;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.util.Locale;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
-
-public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Path path;
- private final int bufferSize;
- private final int maxConcurrentRequestCount;
- private final ThreadPoolExecutor threadExecutor;
- private final ExecutorCompletionService<Void> completionService;
- private FilerProto.Entry.Builder entry;
- private long position;
- private boolean closed;
- private boolean supportFlush = true;
- private volatile IOException lastError;
- private long lastFlushOffset;
- private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
- private ConcurrentLinkedDeque<WriteOperation> writeOperations;
- private String replication = "000";
-
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
- this.filerGrpcClient = filerGrpcClient;
- this.replication = replication;
- this.path = path;
- this.position = position;
- this.closed = false;
- this.lastError = null;
- this.lastFlushOffset = 0;
- this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
- this.writeOperations = new ConcurrentLinkedDeque<>();
-
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
-
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
-
- this.entry = entry;
-
- }
-
- private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
-
- LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry);
-
- try {
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
- } catch (Exception ex) {
- throw new IOException(ex);
- }
- this.lastFlushOffset = offset;
- }
-
- @Override
- public void write(final int byteVal) throws IOException {
- write(new byte[]{(byte) (byteVal & 0xFF)});
- }
-
- @Override
- public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
- maybeThrowLastError();
-
- Preconditions.checkArgument(data != null, "null data");
-
- if (off < 0 || length < 0 || length > data.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
- int numberOfBytesToWrite = length;
-
- while (numberOfBytesToWrite > 0) {
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
- }
-
- writableBytes = bufferSize - bufferIndex;
- }
- }
-
- /**
- * Flushes this output stream and forces any buffered output bytes to be
- * written out. If any data remains in the payload it is committed to the
- * service. Data is queued for writing and forced out to the service
- * before the call returns.
- */
- @Override
- public void flush() throws IOException {
- if (supportFlush) {
- flushInternalAsync();
- }
- }
-
- /**
- * Similar to posix fsync, flush out the data in client's user buffer
- * all the way to the disk device (but the disk may have it in its cache).
- *
- * @throws IOException if error occurs
- */
- @Override
- public void hsync() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Flush out the data in client's user buffer. After the return of
- * this call, new readers will see the data.
- *
- * @throws IOException if any error occurs
- */
- @Override
- public void hflush() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Query the stream for a specific capability.
- *
- * @param capability string to query the stream support for.
- * @return true for hsync and hflush.
- */
- @Override
- public boolean hasCapability(String capability) {
- switch (capability.toLowerCase(Locale.ENGLISH)) {
- case StreamCapabilities.HSYNC:
- case StreamCapabilities.HFLUSH:
- return supportFlush;
- default:
- return false;
- }
- }
-
- /**
- * Force all data in the output stream to be written to Azure storage.
- * Wait to return until this is complete. Close the access to the stream and
- * shutdown the upload thread pool.
- * If the blob was created, its lease will be released.
- * Any error encountered caught in threads and stored will be rethrown here
- * after cleanup.
- */
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
-
- LOG.debug("close path: {}", path);
- try {
- flushInternal();
- threadExecutor.shutdown();
- } finally {
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- buffer = null;
- bufferIndex = 0;
- closed = true;
- writeOperations.clear();
- if (!threadExecutor.isShutdown()) {
- threadExecutor.shutdownNow();
- }
- }
- }
-
- private synchronized void writeCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
- return;
- }
-
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
-
- buffer = new byte[bufferSize];
- bufferIndex = 0;
- final long offset = position;
- position += bytesLength;
-
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- waitForTaskToComplete();
- }
-
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
- });
-
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
- }
-
- private void waitForTaskToComplete() throws IOException {
- boolean completed;
- for (completed = false; completionService.poll() != null; completed = true) {
- // keep polling until there is no data
- }
-
- if (!completed) {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
- throw lastError;
- }
- }
- }
-
- private void maybeThrowLastError() throws IOException {
- if (lastError != null) {
- throw lastError;
- }
- }
-
- /**
- * Try to remove the completed write operations from the beginning of write
- * operation FIFO queue.
- */
- private synchronized void shrinkWriteOperationQueue() throws IOException {
- try {
- while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
- writeOperations.peek().task.get();
- lastTotalAppendOffset += writeOperations.peek().length;
- writeOperations.remove();
- }
- } catch (Exception e) {
- lastError = new IOException(e);
- throw lastError;
- }
- }
-
- private synchronized void flushInternal() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToService();
- }
-
- private synchronized void flushInternalAsync() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToServiceAsync();
- }
-
- private synchronized void flushWrittenBytesToService() throws IOException {
- for (WriteOperation writeOperation : writeOperations) {
- try {
- writeOperation.task.get();
- } catch (Exception ex) {
- lastError = new IOException(ex);
- throw lastError;
- }
- }
- LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
- flushWrittenBytesToServiceInternal(position);
- }
-
- private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
- shrinkWriteOperationQueue();
-
- if (this.lastTotalAppendOffset > this.lastFlushOffset) {
- this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
- }
- }
-
- private static class WriteOperation {
- private final Future<Void> task;
- private final long startOffset;
- private final long length;
-
- WriteOperation(final Future<Void> task, final long startOffset, final long length) {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkArgument(startOffset >= 0, "startOffset");
- Preconditions.checkArgument(length >= 0, "length");
-
- this.task = task;
- this.startOffset = startOffset;
- this.length = length;
- }
- }
-
-}
diff --git a/other/java/s3copier/pom.xml b/other/java/s3copier/pom.xml
index f8cb9e91c..c3ff30932 100644
--- a/other/java/s3copier/pom.xml
+++ b/other/java/s3copier/pom.xml
@@ -28,7 +28,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>3.8.1</version>
+ <version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>