aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml31
-rw-r--r--other/java/client/pom_debug.xml139
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java27
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java194
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java54
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java64
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/Gzip.java37
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java55
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java173
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java108
-rw-r--r--other/java/client/src/main/proto/filer.proto128
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java42
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java23
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml133
-rw-r--r--other/java/hdfs2/pom.xml163
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java)0
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java)0
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java)0
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java)0
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java)208
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java)53
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java)0
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java280
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml133
-rw-r--r--other/java/hdfs3/pom.xml (renamed from other/java/hdfs/pom.xml)8
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java137
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java394
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java29
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java70
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java620
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java282
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java371
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java (renamed from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java)3
33 files changed, 3651 insertions, 308 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 1ea39863f..05061e0f6 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.0.5</version>
+ <version>1.2.9</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -13,12 +14,18 @@
</parent>
<properties>
- <protobuf.version>3.5.1</protobuf.version>
- <grpc.version>1.16.1</grpc.version>
- <guava.version>26.0-jre</guava.version>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
</properties>
<dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
@@ -74,7 +81,7 @@
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
- <version>1.5.0.Final</version>
+ <version>1.6.2</version>
</extension>
</extensions>
<plugins>
@@ -82,18 +89,20 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
- <version>0.5.1</version>
+ <version>0.6.1</version>
<configuration>
- <protocArtifact>com.google.protobuf:protoc:${protobuf.version}-1:exe:${os.detected.classifier}</protocArtifact>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
<pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
</configuration>
<executions>
<execution>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
new file mode 100644
index 000000000..1d8454bf7
--- /dev/null
+++ b/other/java/client/pom_debug.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.2.9</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
new file mode 100644
index 000000000..e249d4524
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -0,0 +1,27 @@
+package seaweedfs.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.concurrent.TimeUnit;
+
+public class ChunkCache {
+
+ private final Cache<String, byte[]> cache;
+
+ public ChunkCache(int maxEntries) {
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(maxEntries)
+ .expireAfterAccess(1, TimeUnit.HOURS)
+ .build();
+ }
+
+ public byte[] getChunk(String fileId) {
+ return this.cache.getIfPresent(fileId);
+ }
+
+ public void setChunk(String fileId, byte[] data) {
+ this.cache.put(fileId, data);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 63d0d8320..ef32c7e9a 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -7,13 +7,14 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
public class FilerClient {
private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
- private FilerGrpcClient filerGrpcClient;
+ private final FilerGrpcClient filerGrpcClient;
public FilerClient(String host, int grpcPort) {
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
@@ -34,13 +35,12 @@ public class FilerClient {
public boolean mkdirs(String path, int mode, int uid, int gid, String userName, String[] groupNames) {
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
-
if ("/".equals(path)) {
return true;
}
+ Path pathObject = Paths.get(path);
+ String parent = pathObject.getParent().toString();
+ String name = pathObject.getFileName().toString();
mkdirs(parent, mode, uid, gid, userName, groupNames);
@@ -51,23 +51,38 @@ public class FilerClient {
}
return createEntry(
- parent,
- newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build()
+ parent,
+ newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build()
);
}
- public boolean rm(String path, boolean isRecursive) {
+ public boolean mv(String oldPath, String newPath) {
+
+ Path oldPathObject = Paths.get(oldPath);
+ String oldParent = oldPathObject.getParent().toString();
+ String oldName = oldPathObject.getFileName().toString();
+
+ Path newPathObject = Paths.get(newPath);
+ String newParent = newPathObject.getParent().toString();
+ String newName = newPathObject.getFileName().toString();
+
+ return atomicRenameEntry(oldParent, oldName, newParent, newName);
+
+ }
+
+ public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) {
Path pathObject = Paths.get(path);
String parent = pathObject.getParent().toString();
String name = pathObject.getFileName().toString();
return deleteEntry(
- parent,
- name,
- true,
- isRecursive);
+ parent,
+ name,
+ true,
+ isRecursive,
+ ignoreRecusiveError);
}
public boolean touch(String path, int mode) {
@@ -84,18 +99,18 @@ public class FilerClient {
FilerProto.Entry entry = lookupEntry(parent, name);
if (entry == null) {
return createEntry(
- parent,
- newFileEntry(name, mode, uid, gid, userName, groupNames).build()
+ parent,
+ newFileEntry(name, mode, uid, gid, userName, groupNames).build()
);
}
long now = System.currentTimeMillis() / 1000L;
FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder()
- .setMtime(now)
- .setUid(uid)
- .setGid(gid)
- .setUserName(userName)
- .clearGroupName()
- .addAllGroupName(Arrays.asList(groupNames));
+ .setMtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames));
return updateEntry(parent, entry.toBuilder().setAttributes(attr).build());
}
@@ -105,17 +120,17 @@ public class FilerClient {
long now = System.currentTimeMillis() / 1000L;
return FilerProto.Entry.newBuilder()
- .setName(name)
- .setIsDirectory(true)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setMtime(now)
- .setCrtime(now)
- .setUid(uid)
- .setGid(gid)
- .setFileMode(mode | 1 << 31)
- .setUserName(userName)
- .clearGroupName()
- .addAllGroupName(Arrays.asList(groupNames)));
+ .setName(name)
+ .setIsDirectory(true)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setFileMode(mode | 1 << 31)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames)));
}
public FilerProto.Entry.Builder newFileEntry(String name, int mode,
@@ -124,17 +139,17 @@ public class FilerClient {
long now = System.currentTimeMillis() / 1000L;
return FilerProto.Entry.newBuilder()
- .setName(name)
- .setIsDirectory(false)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setMtime(now)
- .setCrtime(now)
- .setUid(uid)
- .setGid(gid)
- .setFileMode(mode)
- .setUserName(userName)
- .clearGroupName()
- .addAllGroupName(Arrays.asList(groupNames)));
+ .setName(name)
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setMtime(now)
+ .setCrtime(now)
+ .setUid(uid)
+ .setGid(gid)
+ .setFileMode(mode)
+ .setUserName(userName)
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(groupNames)));
}
public List<FilerProto.Entry> listEntries(String path) {
@@ -159,22 +174,35 @@ public class FilerClient {
}
public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
- return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
- .setDirectory(path)
- .setPrefix(entryPrefix)
- .setStartFromFileName(lastEntryName)
- .setLimit(limit)
- .build()).getEntriesList();
+ Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ .setDirectory(path)
+ .setPrefix(entryPrefix)
+ .setStartFromFileName(lastEntryName)
+ .setLimit(limit)
+ .build());
+ List<FilerProto.Entry> entries = new ArrayList<>();
+ while (iter.hasNext()) {
+ FilerProto.ListEntriesResponse resp = iter.next();
+ entries.add(fixEntryAfterReading(resp.getEntry()));
+ }
+ return entries;
}
public FilerProto.Entry lookupEntry(String directory, String entryName) {
try {
- return filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
- FilerProto.LookupDirectoryEntryRequest.newBuilder()
- .setDirectory(directory)
- .setName(entryName)
- .build()).getEntry();
+ FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
+ FilerProto.LookupDirectoryEntryRequest.newBuilder()
+ .setDirectory(directory)
+ .setName(entryName)
+ .build()).getEntry();
+ if (entry == null) {
+ return null;
+ }
+ return fixEntryAfterReading(entry);
} catch (Exception e) {
+ if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
+ return null;
+ }
LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);
return null;
}
@@ -184,9 +212,9 @@ public class FilerClient {
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
} catch (Exception e) {
LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
return false;
@@ -197,9 +225,9 @@ public class FilerClient {
public boolean updateEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
} catch (Exception e) {
LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
return false;
@@ -207,14 +235,15 @@ public class FilerClient {
return true;
}
- public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) {
+ public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) {
try {
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
- .setDirectory(parent)
- .setName(entryName)
- .setIsDeleteData(isDeleteFileChunk)
- .setIsRecursive(isRecursive)
- .build());
+ .setDirectory(parent)
+ .setName(entryName)
+ .setIsDeleteData(isDeleteFileChunk)
+ .setIsRecursive(isRecursive)
+ .setIgnoreRecursiveError(ignoreRecusiveError)
+ .build());
} catch (Exception e) {
LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e);
return false;
@@ -222,4 +251,39 @@ public class FilerClient {
return true;
}
+ public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) {
+ try {
+ filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
+ .setOldDirectory(oldParent)
+ .setOldName(oldName)
+ .setNewDirectory(newParent)
+ .setNewName(newName)
+ .build());
+ } catch (Exception e) {
+ LOG.warn("atomicRenameEntry {}/{} => {}/{}: {}", oldParent, oldName, newParent, newName, e);
+ return false;
+ }
+ return true;
+ }
+
+ private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) {
+ if (entry.getChunksList().size() <= 0) {
+ return entry;
+ }
+ String fileId = entry.getChunks(0).getFileId();
+ if (fileId != null && fileId.length() != 0) {
+ return entry;
+ }
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ entryBuilder.clearChunks();
+ for (FilerProto.FileChunk chunk : entry.getChunksList()) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ FilerProto.FileId fid = chunk.getFid();
+ fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ chunkBuilder.setFileId(fileId);
+ entryBuilder.addChunks(chunkBuilder);
+ }
+ return entryBuilder.build();
+ }
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 16b7c3249..3f5d1e8e9 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -2,22 +2,55 @@ package seaweedfs.client;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLException;
import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
public class FilerGrpcClient {
- private static final Logger logger = Logger.getLogger(FilerGrpcClient.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
+ static SslContext sslContext;
+
+ static {
+ try {
+ sslContext = FilerSslContext.loadSslContext();
+ } catch (SSLException e) {
+ logger.warn("failed to load ssl context", e);
+ }
+ }
private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
-
+ private boolean cipher = false;
+ private String collection = "";
+ private String replication = "";
public FilerGrpcClient(String host, int grpcPort) {
- this(ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext());
+ this(host, grpcPort, sslContext);
+ }
+
+ public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
+
+ this(sslContext == null ?
+ ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
+ NettyChannelBuilder.forAddress(host, grpcPort)
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(sslContext));
+
+ FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
+ this.getBlockingStub().getFilerConfiguration(
+ FilerProto.GetFilerConfigurationRequest.newBuilder().build());
+ cipher = filerConfigurationResponse.getCipher();
+ collection = filerConfigurationResponse.getCollection();
+ replication = filerConfigurationResponse.getReplication();
+
}
public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
@@ -27,6 +60,18 @@ public class FilerGrpcClient {
futureStub = SeaweedFilerGrpc.newFutureStub(channel);
}
+ public boolean isCipher() {
+ return cipher;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public String getReplication() {
+ return replication;
+ }
+
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
@@ -42,4 +87,5 @@ public class FilerGrpcClient {
public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
return futureStub;
}
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java b/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java
new file mode 100644
index 000000000..5a88c1da3
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerSslContext.java
@@ -0,0 +1,64 @@
+package seaweedfs.client;
+
+import com.google.common.base.Strings;
+import com.moandjiezana.toml.Toml;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
+import java.io.File;
+
+public class FilerSslContext {
+
+ private static final Logger logger = LoggerFactory.getLogger(FilerSslContext.class);
+
+ public static SslContext loadSslContext() throws SSLException {
+ String securityFileName = "security.toml";
+ String home = System.getProperty("user.home");
+ File f1 = new File("./"+securityFileName);
+ File f2 = new File(home + "/.seaweedfs/"+securityFileName);
+ File f3 = new File(home + "/etc/seaweedfs/"+securityFileName);
+
+ File securityFile = f1.exists()? f1 : f2.exists() ? f2 : f3.exists()? f3 : null;
+
+ if (securityFile==null){
+ return null;
+ }
+
+ Toml toml = new Toml().read(securityFile);
+ logger.debug("reading ssl setup from {}", securityFile);
+
+ String trustCertCollectionFilePath = toml.getString("grpc.ca");
+ logger.debug("loading ca from {}", trustCertCollectionFilePath);
+ String clientCertChainFilePath = toml.getString("grpc.client.cert");
+ logger.debug("loading client ca from {}", clientCertChainFilePath);
+ String clientPrivateKeyFilePath = toml.getString("grpc.client.key");
+ logger.debug("loading client key from {}", clientPrivateKeyFilePath);
+
+ if (Strings.isNullOrEmpty(clientPrivateKeyFilePath) && Strings.isNullOrEmpty(clientPrivateKeyFilePath)){
+ return null;
+ }
+
+ // possibly fix the format https://netty.io/wiki/sslcontextbuilder-and-private-key.html
+
+ return buildSslContext(trustCertCollectionFilePath, clientCertChainFilePath, clientPrivateKeyFilePath);
+ }
+
+
+ private static SslContext buildSslContext(String trustCertCollectionFilePath,
+ String clientCertChainFilePath,
+ String clientPrivateKeyFilePath) throws SSLException {
+ SslContextBuilder builder = GrpcSslContexts.forClient();
+ if (trustCertCollectionFilePath != null) {
+ builder.trustManager(new File(trustCertCollectionFilePath));
+ }
+ if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) {
+ builder.keyManager(new File(clientCertChainFilePath), new File(clientPrivateKeyFilePath));
+ }
+ return builder.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
new file mode 100644
index 000000000..248285dd3
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
@@ -0,0 +1,37 @@
+package seaweedfs.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class Gzip {
+ public static byte[] compress(byte[] data) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
+ GZIPOutputStream gzip = new GZIPOutputStream(bos);
+ gzip.write(data);
+ gzip.close();
+ byte[] compressed = bos.toByteArray();
+ bos.close();
+ return compressed;
+ }
+
+ public static byte[] decompress(byte[] compressed) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ return readAll(gis);
+ }
+
+ private static byte[] readAll(InputStream input) throws IOException {
+ try( ByteArrayOutputStream output = new ByteArrayOutputStream()){
+ byte[] buffer = new byte[4096];
+ int n;
+ while (-1 != (n = input.read(buffer))) {
+ output.write(buffer, 0, n);
+ }
+ return output.toByteArray();
+ }
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
new file mode 100644
index 000000000..8d0ebd755
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
@@ -0,0 +1,55 @@
+package seaweedfs.client;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.security.SecureRandom;
+
+public class SeaweedCipher {
+ // AES-GCM parameters
+ public static final int AES_KEY_SIZE = 256; // in bits
+ public static final int GCM_NONCE_LENGTH = 12; // in bytes
+ public static final int GCM_TAG_LENGTH = 16; // in bytes
+
+ private static SecureRandom random = new SecureRandom();
+
+ public static byte[] genCipherKey() throws Exception {
+ byte[] key = new byte[AES_KEY_SIZE / 8];
+ random.nextBytes(key);
+ return key;
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception {
+ return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey);
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception {
+
+ final byte[] nonce = new byte[GCM_NONCE_LENGTH];
+ random.nextBytes(nonce);
+ GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+
+ Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec);
+
+ byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length);
+
+ byte[] iv = AES_cipherInstance.getIV();
+ byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH];
+ System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length);
+
+ return message;
+ }
+
+ public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception {
+ final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+ AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params);
+ byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH);
+ return decryptedText;
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index a906a689b..7be39da53 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -5,25 +5,25 @@ import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class SeaweedRead {
- // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+
+ static ChunkCache chunkCache = new ChunkCache(1000);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength) {
+ final int bufferLength) throws IOException {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
@@ -34,7 +34,7 @@ public class SeaweedRead {
}
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
- .getBlockingStub().lookupVolume(lookupRequest.build());
+ .getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
@@ -58,35 +58,64 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) {
- HttpClient client = HttpClientBuilder.create().build();
- HttpGet request = new HttpGet(
- String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+ private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
- if (!chunkView.isFullChunk) {
- request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
- request.setHeader(HttpHeaders.RANGE,
- String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size));
+ if (chunkData == null) {
+ chunkData = doFetchFullChunkData(chunkView, locations);
}
+ int len = (int) chunkView.size;
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
+ System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
+
+ chunkCache.setChunk(chunkView.fileId, chunkData);
+
+ return len;
+ }
+
+ private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ HttpClient client = new DefaultHttpClient();
+ HttpGet request = new HttpGet(
+ String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+
+ request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
+
+ byte[] data = null;
+
try {
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
- int len = (int) (chunkView.logicOffset - position + chunkView.size);
- OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len));
- entity.writeTo(outputStream);
- // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len);
+ data = EntityUtils.toByteArray(entity);
- return len;
+ } finally {
+ if (client instanceof Closeable) {
+ Closeable t = (Closeable) client;
+ t.close();
+ }
+ }
- } catch (IOException e) {
- e.printStackTrace();
+ if (chunkView.isGzipped) {
+ data = Gzip.decompress(data);
}
- return 0;
+
+ if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
+ try {
+ data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
+ } catch (Exception e) {
+ throw new IOException("fail to decrypt", e);
+ }
+ }
+
+ return data;
+
}
- public static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
+ protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
List<ChunkView> views = new ArrayList<>();
long stop = offset + size;
@@ -94,11 +123,13 @@ public class SeaweedRead {
if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
- chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
- isFullChunk
+ chunk.fileId,
+ offset - chunk.start,
+ Math.min(chunk.stop, stop) - offset,
+ offset,
+ isFullChunk,
+ chunk.cipherKey,
+ chunk.isGzipped
));
offset = Math.min(chunk.stop, stop);
}
@@ -128,11 +159,13 @@ public class SeaweedRead {
List<VisibleInterval> newVisibles,
FilerProto.FileChunk chunk) {
VisibleInterval newV = new VisibleInterval(
- chunk.getOffset(),
- chunk.getOffset() + chunk.getSize(),
- chunk.getFileId(),
- chunk.getMtime(),
- true
+ chunk.getOffset(),
+ chunk.getOffset() + chunk.getSize(),
+ chunk.getFileId(),
+ chunk.getMtime(),
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsGzipped()
);
// easy cases to speed up
@@ -148,21 +181,25 @@ public class SeaweedRead {
for (VisibleInterval v : visibles) {
if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
newVisibles.add(new VisibleInterval(
- v.start,
- chunk.getOffset(),
- v.fileId,
- v.modifiedTime,
- false
+ v.start,
+ chunk.getOffset(),
+ v.fileId,
+ v.modifiedTime,
+ false,
+ v.cipherKey,
+ v.isGzipped
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
if (v.start < chunkStop && chunkStop < v.stop) {
newVisibles.add(new VisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false
+ chunkStop,
+ v.stop,
+ v.fileId,
+ v.modifiedTime,
+ false,
+ v.cipherKey,
+ v.isGzipped
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -209,24 +246,30 @@ public class SeaweedRead {
public final long modifiedTime;
public final String fileId;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isGzipped;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isGzipped = isGzipped;
}
@Override
public String toString() {
return "VisibleInterval{" +
- "start=" + start +
- ", stop=" + stop +
- ", modifiedTime=" + modifiedTime +
- ", fileId='" + fileId + '\'' +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "start=" + start +
+ ", stop=" + stop +
+ ", modifiedTime=" + modifiedTime +
+ ", fileId='" + fileId + '\'' +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isGzipped=" + isGzipped +
+ '}';
}
}
@@ -236,24 +279,30 @@ public class SeaweedRead {
public final long size;
public final long logicOffset;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isGzipped;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isGzipped = isGzipped;
}
@Override
public String toString() {
return "ChunkView{" +
- "fileId='" + fileId + '\'' +
- ", offset=" + offset +
- ", size=" + size +
- ", logicOffset=" + logicOffset +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "fileId='" + fileId + '\'' +
+ ", offset=" + offset +
+ ", size=" + size +
+ ", logicOffset=" + logicOffset +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isGzipped=" + isGzipped +
+ '}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index a7cede09f..18ec77b76 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,18 +1,23 @@
package seaweedfs.client;
+import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.DefaultHttpClient;
import java.io.ByteArrayInputStream;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.security.SecureRandom;
public class SeaweedWrite {
+ private static SecureRandom random = new SecureRandom();
+
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
final FilerGrpcClient filerGrpcClient,
@@ -20,53 +25,83 @@ public class SeaweedWrite {
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
- FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection("")
- .setReplication(replication)
- .setDataCenter("")
- .setReplication("")
- .setTtlSec(0)
- .build());
+ FilerProto.AssignVolumeRequest.newBuilder()
+ .setCollection(filerGrpcClient.getCollection())
+ .setReplication(replication == null ? filerGrpcClient.getReplication() : replication)
+ .setDataCenter("")
+ .setTtlSec(0)
+ .build());
String fileId = response.getFileId();
String url = response.getUrl();
+ String auth = response.getAuth();
String targetUrl = String.format("http://%s/%s", url, fileId);
- String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength);
+ ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
+ byte[] cipherKey = null;
+ if (filerGrpcClient.isCipher()) {
+ cipherKey = genCipherKey();
+ cipherKeyString = ByteString.copyFrom(cipherKey);
+ }
+
+ String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
+
+ synchronized (entry) {
+ entry.addChunks(FilerProto.FileChunk.newBuilder()
+ .setFileId(fileId)
+ .setOffset(offset)
+ .setSize(bytesLength)
+ .setMtime(System.currentTimeMillis() / 10000L)
+ .setETag(etag)
+ .setCipherKey(cipherKeyString)
+ );
+ }
- entry.addChunks(FilerProto.FileChunk.newBuilder()
- .setFileId(fileId)
- .setOffset(offset)
- .setSize(bytesLength)
- .setMtime(System.currentTimeMillis() / 10000L)
- .setETag(etag)
- );
+ // cache fileId ~ bytes
+ SeaweedRead.chunkCache.setChunk(fileId, bytes);
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory, final FilerProto.Entry.Builder entry) {
- filerGrpcClient.getBlockingStub().createEntry(
- FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parentDirectory)
- .setEntry(entry)
- .build()
- );
+ synchronized (entry){
+ filerGrpcClient.getBlockingStub().createEntry(
+ FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parentDirectory)
+ .setEntry(entry)
+ .build()
+ );
+ }
}
private static String multipartUpload(String targetUrl,
+ String auth,
final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
-
- CloseableHttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build();
-
- InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ final long bytesOffset, final long bytesLength,
+ byte[] cipherKey) throws IOException {
+
+ HttpClient client = new DefaultHttpClient();
+
+ InputStream inputStream = null;
+ if (cipherKey == null || cipherKey.length == 0) {
+ inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ } else {
+ try {
+ byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey);
+ inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length);
+ } catch (Exception e) {
+ throw new IOException("fail to encrypt data", e);
+ }
+ }
HttpPost post = new HttpPost(targetUrl);
+ if (auth != null && auth.length() != 0) {
+ post.addHeader("Authorization", "BEARER " + auth);
+ }
post.setEntity(MultipartEntityBuilder.create()
- .setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
- .addBinaryBody("upload", inputStream)
- .build());
+ .setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+ .addBinaryBody("upload", inputStream)
+ .build());
try {
HttpResponse response = client.execute(post);
@@ -79,8 +114,17 @@ public class SeaweedWrite {
return etag;
} finally {
- client.close();
+ if (client instanceof Closeable) {
+ Closeable t = (Closeable) client;
+ t.close();
+ }
}
}
+
+ private static byte[] genCipherKey() {
+ byte[] b = new byte[32];
+ random.nextBytes(b);
+ return b;
+ }
}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 6cd4df6b4..37121f29c 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -2,6 +2,7 @@ syntax = "proto3";
package filer_pb;
+option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb";
option java_package = "seaweedfs.client";
option java_outer_classname = "FilerProto";
@@ -12,7 +13,7 @@ service SeaweedFiler {
rpc LookupDirectoryEntry (LookupDirectoryEntryRequest) returns (LookupDirectoryEntryResponse) {
}
- rpc ListEntries (ListEntriesRequest) returns (ListEntriesResponse) {
+ rpc ListEntries (ListEntriesRequest) returns (stream ListEntriesResponse) {
}
rpc CreateEntry (CreateEntryRequest) returns (CreateEntryResponse) {
@@ -21,9 +22,15 @@ service SeaweedFiler {
rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) {
}
+ rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) {
+ }
+
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
}
+ rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) {
+ }
+
rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) {
}
@@ -36,6 +43,21 @@ service SeaweedFiler {
rpc Statistics (StatisticsRequest) returns (StatisticsResponse) {
}
+ rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
+ }
+
+ rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
+ rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
+ rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
+ }
+
+ rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -58,7 +80,7 @@ message ListEntriesRequest {
}
message ListEntriesResponse {
- repeated Entry entries = 1;
+ Entry entry = 1;
}
message Entry {
@@ -69,19 +91,36 @@ message Entry {
map<string, bytes> extended = 5;
}
+message FullEntry {
+ string dir = 1;
+ Entry entry = 2;
+}
+
message EventNotification {
Entry old_entry = 1;
Entry new_entry = 2;
bool delete_chunks = 3;
+ string new_parent_path = 4;
+ bool is_from_other_cluster = 5;
}
message FileChunk {
- string file_id = 1;
+ string file_id = 1; // to be deprecated
int64 offset = 2;
uint64 size = 3;
int64 mtime = 4;
string e_tag = 5;
- string source_file_id = 6;
+ string source_file_id = 6; // to be deprecated
+ FileId fid = 7;
+ FileId source_fid = 8;
+ bytes cipher_key = 9;
+ bool is_compressed = 10;
+}
+
+message FileId {
+ uint32 volume_id = 1;
+ uint64 file_key = 2;
+ fixed32 cookie = 3;
}
message FuseAttributes {
@@ -98,32 +137,58 @@ message FuseAttributes {
string user_name = 11; // for hdfs
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
+ bytes md5 = 14;
}
message CreateEntryRequest {
string directory = 1;
Entry entry = 2;
+ bool o_excl = 3;
+ bool is_from_other_cluster = 4;
}
message CreateEntryResponse {
+ string error = 1;
}
message UpdateEntryRequest {
string directory = 1;
Entry entry = 2;
+ bool is_from_other_cluster = 3;
}
message UpdateEntryResponse {
}
+message AppendToEntryRequest {
+ string directory = 1;
+ string entry_name = 2;
+ repeated FileChunk chunks = 3;
+}
+message AppendToEntryResponse {
+}
+
message DeleteEntryRequest {
string directory = 1;
string name = 2;
// bool is_directory = 3;
bool is_delete_data = 4;
bool is_recursive = 5;
+ bool ignore_recursive_error = 6;
+ bool is_from_other_cluster = 7;
}
message DeleteEntryResponse {
+ string error = 1;
+}
+
+message AtomicRenameEntryRequest {
+ string old_directory = 1;
+ string old_name = 2;
+ string new_directory = 3;
+ string new_name = 4;
+}
+
+message AtomicRenameEntryResponse {
}
message AssignVolumeRequest {
@@ -132,6 +197,7 @@ message AssignVolumeRequest {
string replication = 3;
int32 ttl_sec = 4;
string data_center = 5;
+ string parent_path = 6;
}
message AssignVolumeResponse {
@@ -139,6 +205,10 @@ message AssignVolumeResponse {
string url = 2;
string public_url = 3;
int32 count = 4;
+ string auth = 5;
+ string collection = 6;
+ string replication = 7;
+ string error = 8;
}
message LookupVolumeRequest {
@@ -177,3 +247,53 @@ message StatisticsResponse {
uint64 used_size = 5;
uint64 file_count = 6;
}
+
+message GetFilerConfigurationRequest {
+}
+message GetFilerConfigurationResponse {
+ repeated string masters = 1;
+ string replication = 2;
+ string collection = 3;
+ uint32 max_mb = 4;
+ string dir_buckets = 5;
+ bool cipher = 7;
+}
+
+message SubscribeMetadataRequest {
+ string client_name = 1;
+ string path_prefix = 2;
+ int64 since_ns = 3;
+}
+message SubscribeMetadataResponse {
+ string directory = 1;
+ EventNotification event_notification = 2;
+ int64 ts_ns = 3;
+}
+
+message LogEntry {
+ int64 ts_ns = 1;
+ int32 partition_key_hash = 2;
+ bytes data = 3;
+}
+
+message KeepConnectedRequest {
+ string name = 1;
+ uint32 grpc_port = 2;
+ repeated string resources = 3;
+}
+message KeepConnectedResponse {
+}
+
+message LocateBrokerRequest {
+ string resource = 1;
+}
+message LocateBrokerResponse {
+ bool found = 1;
+ // if found, send the exact address
+ // if not found, send the full list of existing brokers
+ message Resource {
+ string grpc_addresses = 1;
+ int32 resource_count = 2;
+ }
+ repeated Resource resources = 2;
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
new file mode 100644
index 000000000..7b5e53e19
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
@@ -0,0 +1,42 @@
+package seaweedfs.client;
+
+import org.junit.Test;
+
+import java.util.Base64;
+
+import static seaweedfs.client.SeaweedCipher.decrypt;
+import static seaweedfs.client.SeaweedCipher.encrypt;
+
+public class SeaweedCipherTest {
+
+ @Test
+ public void testSameAsGoImplemnetation() throws Exception {
+ byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+ @Test
+ public void testEncryptDecrypt() throws Exception {
+ byte[] secretKey = SeaweedCipher.genCipherKey();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java
new file mode 100644
index 000000000..eaf17e5c6
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java
@@ -0,0 +1,23 @@
+package seaweedfs.client;
+
+import java.util.List;
+
+public class SeaweedFilerTest {
+ public static void main(String[] args){
+
+ FilerClient filerClient = new FilerClient("localhost", 18888);
+
+ List<FilerProto.Entry> entries = filerClient.listEntries("/");
+
+ for (FilerProto.Entry entry : entries) {
+ System.out.println(entry.toString());
+ }
+
+ filerClient.mkdirs("/new_folder", 0755);
+ filerClient.touch("/new_folder/new_empty_file", 0755);
+ filerClient.touch("/new_folder/new_empty_file2", 0755);
+ filerClient.rm("/new_folder/new_empty_file", false, true);
+ filerClient.rm("/new_folder", true, true);
+
+ }
+}
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
new file mode 100644
index 000000000..53fb62186
--- /dev/null
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>oss-parent</artifactId>
+ <groupId>org.sonatype.oss</groupId>
+ <version>9</version>
+ <relativePath>../pom.xml/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-hadoop2-client</artifactId>
+ <version>${seaweedfs.client.version}</version>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>7</source>
+ <target>7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>org/slf4j/**</exclude>
+ <exclude>META-INF/maven/org.slf4j/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer />
+ </transformers>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>shaded.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.grpc.internal</pattern>
+ <shadedPattern>shaded.io.grpc.internal</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>shaded.org.apache.commons</shadedPattern>
+ <excludes>
+ <exclude>org.apache.hadoop</exclude>
+ <exclude>org.apache.log4j</exclude>
+ </excludes>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>shaded.org.apache.http</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <properties>
+ <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <hadoop.version>2.9.2</hadoop.version>
+ </properties>
+</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
new file mode 100644
index 000000000..0d5b138d5
--- /dev/null
+++ b/other/java/hdfs2/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <hadoop.version>2.9.2</hadoop.version>
+ </properties>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-hadoop2-client</artifactId>
+ <version>${seaweedfs.client.version}</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>7</source>
+ <target>7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>org/slf4j/**</exclude>
+ <exclude>META-INF/maven/org.slf4j/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>shaded.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.grpc.internal</pattern>
+ <shadedPattern>shaded.io.grpc.internal</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>shaded.org.apache.commons</shadedPattern>
+ <excludes>
+ <exclude>org.apache.hadoop</exclude>
+ <exclude>org.apache.log4j</exclude>
+ </excludes>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>shaded.org.apache.http</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>${seaweedfs.client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java
index 926d0b83b..926d0b83b 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java
index 5b1e21529..5b1e21529 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java
index d63674977..d63674977 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java
index 6ffbc4644..6ffbc4644 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 2a0ef78af..d471d8440 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -1,14 +1,7 @@
package seaweed.hdfs;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -73,6 +66,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
this.uri = uri;
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
+
}
@Override
@@ -86,6 +80,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
return new FSDataInputStream(inputStream);
} catch (Exception ex) {
+ LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
}
}
@@ -103,10 +98,36 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
+ LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
return null;
}
}
+ /**
+ * {@inheritDoc}
+ * @throws FileNotFoundException if the parent directory is not present -or
+ * is not a directory.
+ */
+ @Override
+ public FSDataOutputStream createNonRecursive(Path path,
+ FsPermission permission,
+ EnumSet<CreateFlag> flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ Path parent = path.getParent();
+ if (parent != null) {
+ // expect this to raise an exception if there is no parent
+ if (!getFileStatus(parent).isDirectory()) {
+ throw new FileAlreadyExistsException("Not a directory: " + parent);
+ }
+ }
+ return create(path, permission,
+ flags.contains(CreateFlag.OVERWRITE), bufferSize,
+ replication, blockSize, progress);
+ }
+
@Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
@@ -117,6 +138,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
+ LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
return null;
}
}
@@ -206,8 +228,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
- fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
- FsPermission.getUMask(getConf()));
+ fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
+ FsPermission.getUMask(getConf()));
}
@@ -238,7 +260,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*/
@Override
public void setOwner(Path path, final String owner, final String group)
- throws IOException {
+ throws IOException {
LOG.debug("setOwner path: {}", path);
path = qualify(path);
@@ -271,54 +293,55 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/**
* Concat existing files together.
- * @param trg the path to the target destination.
+ *
+ * @param trg the path to the target destination.
* @param psrcs the paths to the sources to use for the concatenation.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default).
+ * (default).
*/
@Override
- public void concat(final Path trg, final Path [] psrcs) throws IOException {
+ public void concat(final Path trg, final Path[] psrcs) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
+ getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Truncate the file in the indicated path to the indicated size.
* <ul>
- * <li>Fails if path is a directory.</li>
- * <li>Fails if path does not exist.</li>
- * <li>Fails if path is not closed.</li>
- * <li>Fails if new size is greater than current size.</li>
+ * <li>Fails if path is a directory.</li>
+ * <li>Fails if path does not exist.</li>
+ * <li>Fails if path is not closed.</li>
+ * <li>Fails if new size is greater than current size.</li>
* </ul>
- * @param f The path to the file to be truncated
- * @param newLength The size the file is to be truncated to
*
+ * @param f The path to the file to be truncated
+ * @param newLength The size the file is to be truncated to
* @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default).
+ * (default).
*/
@Override
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
+ getClass().getSimpleName() + " FileSystem implementation");
}
@Override
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
- IOException {
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
- "Filesystem does not support symlinks!");
+ "Filesystem does not support symlinks!");
}
public boolean supportsSymlinks() {
@@ -327,48 +350,51 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
/**
* Create a snapshot.
- * @param path The directory where snapshots will be taken.
+ *
+ * @param path The directory where snapshots will be taken.
* @param snapshotName The name of the snapshot
* @return the snapshot path.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
*/
@Override
public Path createSnapshot(Path path, String snapshotName)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support createSnapshot");
+ + " doesn't support createSnapshot");
}
/**
* Rename a snapshot.
- * @param path The directory path where the snapshot was taken
+ *
+ * @param path The directory path where the snapshot was taken
* @param snapshotOldName Old name of the snapshot
* @param snapshotNewName New name of the snapshot
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support renameSnapshot");
+ + " doesn't support renameSnapshot");
}
/**
* Delete a snapshot of a directory.
- * @param path The directory that the to-be-deleted snapshot belongs to
+ *
+ * @param path The directory that the to-be-deleted snapshot belongs to
* @param snapshotName The name of the snapshot
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void deleteSnapshot(Path path, String snapshotName)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support deleteSnapshot");
+ + " doesn't support deleteSnapshot");
}
/**
@@ -377,49 +403,49 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
*
- * @param path Path to modify
+ * @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
- * @throws IOException if an ACL could not be modified
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support modifyAclEntries");
+ + " doesn't support modifyAclEntries");
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
- * @param path Path to modify
+ * @param path Path to modify
* @param aclSpec List describing entries to remove
- * @throws IOException if an ACL could not be modified
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAclEntries");
+ + " doesn't support removeAclEntries");
}
/**
* Removes all default ACL entries from files and directories.
*
* @param path Path to modify
- * @throws IOException if an ACL could not be modified
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeDefaultAcl(Path path)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeDefaultAcl");
+ + " doesn't support removeDefaultAcl");
}
/**
@@ -428,32 +454,32 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* bits.
*
* @param path Path to modify
- * @throws IOException if an ACL could not be removed
+ * @throws IOException if an ACL could not be removed
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeAcl(Path path)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAcl");
+ + " doesn't support removeAcl");
}
/**
* Fully replaces ACL of files and directories, discarding all existing
* entries.
*
- * @param path Path to modify
+ * @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
- * for user, group, and others for compatibility with permission bits.
- * @throws IOException if an ACL could not be modified
+ * for user, group, and others for compatibility with permission bits.
+ * @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setAcl");
+ + " doesn't support setAcl");
}
/**
@@ -461,14 +487,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to get
* @return AclStatus describing the ACL of the file or directory
- * @throws IOException if an ACL could not be read
+ * @throws IOException if an ACL could not be read
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public AclStatus getAclStatus(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getAclStatus");
+ + " doesn't support getAclStatus");
}
/**
@@ -478,19 +504,19 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
- * @param path Path to modify
- * @param name xattr name.
+ * @param path Path to modify
+ * @param name xattr name.
* @param value xattr value.
- * @param flag xattr set flag
- * @throws IOException IO failure
+ * @param flag xattr set flag
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setXAttr");
+ + " doesn't support setXAttr");
}
/**
@@ -503,14 +529,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* @param path Path to get extended attribute
* @param name xattr name.
* @return byte[] xattr value.
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttr");
+ + " doesn't support getXAttr");
}
/**
@@ -522,14 +548,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to get extended attributes
* @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
+ + " doesn't support getXAttrs");
}
/**
@@ -539,18 +565,18 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
- * @param path Path to get extended attributes
+ * @param path Path to get extended attributes
* @param names XAttr names.
* @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
+ + " doesn't support getXAttrs");
}
/**
@@ -562,14 +588,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to get extended attributes
* @return List{@literal <String>} of the XAttr names of the file or directory
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public List<String> listXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support listXAttrs");
+ + " doesn't support listXAttrs");
}
/**
@@ -581,14 +607,14 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
*
* @param path Path to remove extended attribute
* @param name xattr name
- * @throws IOException IO failure
+ * @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
+ * (default outcome).
*/
@Override
public void removeXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeXAttr");
+ + " doesn't support removeXAttr");
}
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 27678e615..9617a38be 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -64,6 +64,16 @@ public class SeaweedFileSystemStore {
public FileStatus[] listEntries(final Path path) {
LOG.debug("listEntries path: {}", path);
+ FileStatus pathStatus = getFileStatus(path);
+
+ if (pathStatus == null) {
+ return new FileStatus[0];
+ }
+
+ if (!pathStatus.isDirectory()) {
+ return new FileStatus[]{pathStatus};
+ }
+
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
@@ -74,7 +84,9 @@ public class SeaweedFileSystemStore {
fileStatuses.add(fileStatus);
}
+ LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size());
return fileStatuses.toArray(new FileStatus[0]);
+
}
public FileStatus getFileStatus(final Path path) {
@@ -106,7 +118,7 @@ public class SeaweedFileSystemStore {
}
}
- return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive);
+ return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive, true);
}
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
@@ -137,41 +149,13 @@ public class SeaweedFileSystemStore {
if (source.isRoot()) {
return;
}
- LOG.warn("rename lookupEntry source: {}", source);
+ LOG.info("rename source: {} destination:{}", source, destination);
FilerProto.Entry entry = lookupEntry(source);
if (entry == null) {
LOG.warn("rename non-existing source: {}", source);
return;
}
- LOG.warn("rename moveEntry source: {}", source);
- moveEntry(source.getParent(), entry, destination);
- }
-
- private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) {
-
- LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination);
-
- FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
- boolean isDirectoryCreated = filerClient.createEntry(getParentDirectory(destination), newEntry.build());
-
- if (!isDirectoryCreated) {
- return false;
- }
-
- if (entry.getIsDirectory()) {
- Path entryPath = new Path(oldParent, entry.getName());
- List<FilerProto.Entry> entries = filerClient.listEntries(entryPath.toUri().getPath());
- for (FilerProto.Entry ent : entries) {
- boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName()));
- if (!isSucess) {
- return false;
- }
- }
- }
-
- return filerClient.deleteEntry(
- oldParent.toUri().getPath(), entry.getName(), false, false);
-
+ filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
}
public OutputStream createFile(final Path path,
@@ -199,10 +183,10 @@ public class SeaweedFileSystemStore {
entry = FilerProto.Entry.newBuilder();
entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now);
+ LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
+ writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
+ replication = existingEntry.getAttributes().getReplication();
}
- LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
- writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
- replication = existingEntry.getAttributes().getReplication();
}
if (entry == null) {
entry = FilerProto.Entry.newBuilder()
@@ -294,4 +278,5 @@ public class SeaweedFileSystemStore {
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
}
+
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 90c14c772..90c14c772 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
new file mode 100644
index 000000000..e08843caa
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -0,0 +1,280 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedWrite;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.concurrent.*;
+
+import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
+
+public class SeaweedOutputStream extends OutputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
+
+ private final FilerGrpcClient filerGrpcClient;
+ private final Path path;
+ private final int bufferSize;
+ private final int maxConcurrentRequestCount;
+ private final ThreadPoolExecutor threadExecutor;
+ private final ExecutorCompletionService<Void> completionService;
+ private FilerProto.Entry.Builder entry;
+ private long position;
+ private boolean closed;
+ private boolean supportFlush = true;
+ private volatile IOException lastError;
+ private long lastFlushOffset;
+ private long lastTotalAppendOffset = 0;
+ private byte[] buffer;
+ private int bufferIndex;
+ private ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private String replication = "000";
+
+ public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ this.filerGrpcClient = filerGrpcClient;
+ this.replication = replication;
+ this.path = path;
+ this.position = position;
+ this.closed = false;
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[bufferSize];
+ this.bufferIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+
+ this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+
+ this.entry = entry;
+
+ }
+
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
+ try {
+ SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ this.lastFlushOffset = offset;
+ }
+
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[]{(byte) (byteVal & 0xFF)});
+ }
+
+ @Override
+ public synchronized void write(final byte[] data, final int off, final int length)
+ throws IOException {
+ maybeThrowLastError();
+
+ Preconditions.checkArgument(data != null, "null data");
+
+ if (off < 0 || length < 0 || length > data.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int currentOffset = off;
+ int writableBytes = bufferSize - bufferIndex;
+ int numberOfBytesToWrite = length;
+
+ while (numberOfBytesToWrite > 0) {
+ if (writableBytes <= numberOfBytesToWrite) {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
+ bufferIndex += writableBytes;
+ writeCurrentBufferToService();
+ currentOffset += writableBytes;
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ } else {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
+ bufferIndex += numberOfBytesToWrite;
+ numberOfBytesToWrite = 0;
+ }
+
+ writableBytes = bufferSize - bufferIndex;
+ }
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the payload it is committed to the
+ * service. Data is queued for writing and forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
+ if (supportFlush) {
+ flushInternalAsync();
+ }
+ }
+
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete. Close the access to the stream and
+ * shutdown the upload thread pool.
+ * If the blob was created, its lease will be released.
+ * Any error encountered caught in threads and stored will be rethrown here
+ * after cleanup.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ LOG.debug("close path: {}", path);
+ try {
+ flushInternal();
+ threadExecutor.shutdown();
+ } finally {
+ lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ buffer = null;
+ bufferIndex = 0;
+ closed = true;
+ writeOperations.clear();
+ if (!threadExecutor.isShutdown()) {
+ threadExecutor.shutdownNow();
+ }
+ }
+ }
+
+ private synchronized void writeCurrentBufferToService() throws IOException {
+ if (bufferIndex == 0) {
+ return;
+ }
+
+ final byte[] bytes = buffer;
+ final int bytesLength = bufferIndex;
+
+ buffer = new byte[bufferSize];
+ bufferIndex = 0;
+ final long offset = position;
+ position += bytesLength;
+
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ waitForTaskToComplete();
+ }
+
+ final Future<Void> job = completionService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // originally: client.append(path, offset, bytes, 0, bytesLength);
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
+ return null;
+ }
+ });
+
+ writeOperations.add(new WriteOperation(job, offset, bytesLength));
+
+ // Try to shrink the queue
+ shrinkWriteOperationQueue();
+ }
+
+ private void waitForTaskToComplete() throws IOException {
+ boolean completed;
+ for (completed = false; completionService.poll() != null; completed = true) {
+ // keep polling until there is no data
+ }
+
+ if (!completed) {
+ try {
+ completionService.take();
+ } catch (InterruptedException e) {
+ lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
+ throw lastError;
+ }
+ }
+ }
+
+ private void maybeThrowLastError() throws IOException {
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Try to remove the completed write operations from the beginning of write
+ * operation FIFO queue.
+ */
+ private synchronized void shrinkWriteOperationQueue() throws IOException {
+ try {
+ while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
+ writeOperations.peek().task.get();
+ lastTotalAppendOffset += writeOperations.peek().length;
+ writeOperations.remove();
+ }
+ } catch (Exception e) {
+ lastError = new IOException(e);
+ throw lastError;
+ }
+ }
+
+ private synchronized void flushInternal() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToService();
+ }
+
+ private synchronized void flushInternalAsync() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToServiceAsync();
+ }
+
+ private synchronized void flushWrittenBytesToService() throws IOException {
+ for (WriteOperation writeOperation : writeOperations) {
+ try {
+ writeOperation.task.get();
+ } catch (Exception ex) {
+ lastError = new IOException(ex);
+ throw lastError;
+ }
+ }
+ LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
+ flushWrittenBytesToServiceInternal(position);
+ }
+
+ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
+ shrinkWriteOperationQueue();
+
+ if (this.lastTotalAppendOffset > this.lastFlushOffset) {
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
+ }
+ }
+
+ private static class WriteOperation {
+ private final Future<Void> task;
+ private final long startOffset;
+ private final long length;
+
+ WriteOperation(final Future<Void> task, final long startOffset, final long length) {
+ Preconditions.checkNotNull(task, "task");
+ Preconditions.checkArgument(startOffset >= 0, "startOffset");
+ Preconditions.checkArgument(length >= 0, "length");
+
+ this.task = task;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+ }
+
+}
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
new file mode 100644
index 000000000..f5d14acdd
--- /dev/null
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>oss-parent</artifactId>
+ <groupId>org.sonatype.oss</groupId>
+ <version>9</version>
+ <relativePath>../pom.xml/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-hadoop3-client</artifactId>
+ <version>${seaweedfs.client.version}</version>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>7</source>
+ <target>7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>org/slf4j/**</exclude>
+ <exclude>META-INF/maven/org.slf4j/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer />
+ </transformers>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>shaded.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.grpc.internal</pattern>
+ <shadedPattern>shaded.io.grpc.internal</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>shaded.org.apache.commons</shadedPattern>
+ <excludes>
+ <exclude>org.apache.hadoop</exclude>
+ <exclude>org.apache.log4j</exclude>
+ </excludes>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>shaded.org.apache.http</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <properties>
+ <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <hadoop.version>3.1.1</hadoop.version>
+ </properties>
+</project>
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs3/pom.xml
index a0cab8752..8c88b60df 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,12 +5,12 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.0.5</seaweedfs.client.version>
+ <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
<groupId>com.github.chrislusf</groupId>
- <artifactId>seaweedfs-hadoop-client</artifactId>
+ <artifactId>seaweedfs-hadoop3-client</artifactId>
<version>${seaweedfs.client.version}</version>
<parent>
@@ -79,6 +79,10 @@
<exclude>org.apache.log4j</exclude>
</excludes>
</relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>shaded.org.apache.http</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java
new file mode 100644
index 000000000..926d0b83b
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBuffer {
+
+ private SeaweedInputStream stream;
+ private long offset; // offset within the file for the buffer
+ private int length; // actual length, set after the buffer is filles
+ private int requestedLength; // requested length of the read
+ private byte[] buffer; // the buffer itself
+ private int bufferindex = -1; // index in the buffers array in Buffer manager
+ private ReadBufferStatus status; // status of the buffer
+ private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
+ // waiting on this buffer gets unblocked
+
+ // fields to help with eviction logic
+ private long timeStamp = 0; // tick at which buffer became available to read
+ private boolean isFirstByteConsumed = false;
+ private boolean isLastByteConsumed = false;
+ private boolean isAnyByteConsumed = false;
+
+ public SeaweedInputStream getStream() {
+ return stream;
+ }
+
+ public void setStream(SeaweedInputStream stream) {
+ this.stream = stream;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ public int getRequestedLength() {
+ return requestedLength;
+ }
+
+ public void setRequestedLength(int requestedLength) {
+ this.requestedLength = requestedLength;
+ }
+
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(byte[] buffer) {
+ this.buffer = buffer;
+ }
+
+ public int getBufferindex() {
+ return bufferindex;
+ }
+
+ public void setBufferindex(int bufferindex) {
+ this.bufferindex = bufferindex;
+ }
+
+ public ReadBufferStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(ReadBufferStatus status) {
+ this.status = status;
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public boolean isFirstByteConsumed() {
+ return isFirstByteConsumed;
+ }
+
+ public void setFirstByteConsumed(boolean isFirstByteConsumed) {
+ this.isFirstByteConsumed = isFirstByteConsumed;
+ }
+
+ public boolean isLastByteConsumed() {
+ return isLastByteConsumed;
+ }
+
+ public void setLastByteConsumed(boolean isLastByteConsumed) {
+ this.isLastByteConsumed = isLastByteConsumed;
+ }
+
+ public boolean isAnyByteConsumed() {
+ return isAnyByteConsumed;
+ }
+
+ public void setAnyByteConsumed(boolean isAnyByteConsumed) {
+ this.isAnyByteConsumed = isAnyByteConsumed;
+ }
+
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java
new file mode 100644
index 000000000..5b1e21529
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package seaweed.hdfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ */
+final class ReadBufferManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
+
+ private static final int NUM_BUFFERS = 16;
+ private static final int BLOCK_SIZE = 4 * 1024 * 1024;
+ private static final int NUM_THREADS = 8;
+ private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+
+ private Thread[] threads = new Thread[NUM_THREADS];
+ private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
+ private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
+
+ private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
+ private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
+ private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
+ private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
+
+ static {
+ BUFFER_MANAGER = new ReadBufferManager();
+ BUFFER_MANAGER.init();
+ }
+
+ static ReadBufferManager getBufferManager() {
+ return BUFFER_MANAGER;
+ }
+
+ private void init() {
+ buffers = new byte[NUM_BUFFERS][];
+ for (int i = 0; i < NUM_BUFFERS; i++) {
+ buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
+ freeList.add(i);
+ }
+ for (int i = 0; i < NUM_THREADS; i++) {
+ Thread t = new Thread(new ReadBufferWorker(i));
+ t.setDaemon(true);
+ threads[i] = t;
+ t.setName("SeaweedFS-prefetch-" + i);
+ t.start();
+ }
+ ReadBufferWorker.UNLEASH_WORKERS.countDown();
+ }
+
+ // hide instance constructor
+ private ReadBufferManager() {
+ }
+
+
+ /*
+ *
+ * SeaweedInputStream-facing methods
+ *
+ */
+
+
+ /**
+ * {@link SeaweedInputStream} calls this method to queue read-aheads.
+ *
+ * @param stream The {@link SeaweedInputStream} for which to do the read-ahead
+ * @param requestedOffset The offset in the file which shoukd be read
+ * @param requestedLength The length to read
+ */
+ void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
+ stream.getPath(), requestedOffset, requestedLength);
+ }
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream, requestedOffset)) {
+ return; // already queued, do not queue again
+ }
+ if (freeList.isEmpty() && !tryEvict()) {
+ return; // no buffers available, cannot queue anything
+ }
+
+ buffer = new ReadBuffer();
+ buffer.setStream(stream);
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+
+ Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
+
+ buffer.setBuffer(buffers[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ readAheadQueue.add(buffer);
+ notifyAll();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
+ stream.getPath(), requestedOffset, buffer.getBufferindex());
+ }
+ }
+
+
+ /**
+ * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
+ * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
+ *
+ * @param stream the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
+ * @return the number of bytes read
+ */
+ int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
+ // not synchronized, so have to be careful with locking
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("getBlock for file {} position {} thread {}",
+ stream.getPath(), position, Thread.currentThread().getName());
+ }
+
+ waitForProcess(stream, position);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
+ }
+ if (bytesRead > 0) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Done read from Cache for {} position {} length {}",
+ stream.getPath(), position, bytesRead);
+ }
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do its own read
+ return 0;
+ }
+
+ /*
+ *
+ * Internal methods
+ *
+ */
+
+ private void waitForProcess(final SeaweedInputStream stream, final long position) {
+ ReadBuffer readBuf;
+ synchronized (this) {
+ clearFromReadAheadQueue(stream, position);
+ readBuf = getFromList(inProgressList, stream, position);
+ }
+ if (readBuf != null) { // if in in-progress queue, then block for it
+ try {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
+ stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
+ }
+ readBuf.getLatch().await(); // blocking wait on the caller stream's thread
+ // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
+ // is done processing it (in doneReading). There, the latch is set after removing the buffer from
+ // inProgressList. So this latch is safe to be outside the synchronized block.
+ // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
+ // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
+ // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("latch done for file {} buffer idx {} length {}",
+ stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
+ }
+ }
+ }
+
+ /**
+ * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
+ * The objective is to find just one buffer - there is no advantage to evicting more than one.
+ *
+ * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
+ */
+ private synchronized boolean tryEvict() {
+ ReadBuffer nodeToEvict = null;
+ if (completedReadList.size() <= 0) {
+ return false; // there are no evict-able buffers
+ }
+
+ // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+ if (nodeToEvict != null) {
+ return evict(nodeToEvict);
+ }
+
+ // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.isAnyByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+
+ if (nodeToEvict != null) {
+ return evict(nodeToEvict);
+ }
+
+ // next, try any old nodes that have not been consumed
+ long earliestBirthday = Long.MAX_VALUE;
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.getTimeStamp() < earliestBirthday) {
+ nodeToEvict = buf;
+ earliestBirthday = buf.getTimeStamp();
+ }
+ }
+ if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
+ return evict(nodeToEvict);
+ }
+
+ // nothing can be evicted
+ return false;
+ }
+
+ private boolean evict(final ReadBuffer buf) {
+ freeList.push(buf.getBufferindex());
+ completedReadList.remove(buf);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
+ buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
+ }
+ return true;
+ }
+
+ private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(readAheadQueue, stream, requestedOffset)
+ || isInList(inProgressList, stream, requestedOffset)
+ || isInList(completedReadList, stream, requestedOffset));
+ }
+
+ private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
+ return (getFromList(list, stream, requestedOffset) != null);
+ }
+
+ private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
+ for (ReadBuffer buffer : list) {
+ if (buffer.getStream() == stream) {
+ if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+ && requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+ return buffer;
+ } else if (requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
+ return buffer;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
+ ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
+ if (buffer != null) {
+ readAheadQueue.remove(buffer);
+ notifyAll(); // lock is held in calling method
+ freeList.push(buffer.getBufferindex());
+ }
+ }
+
+ private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
+ final byte[] buffer) {
+ ReadBuffer buf = getFromList(completedReadList, stream, position);
+ if (buf == null || position >= buf.getOffset() + buf.getLength()) {
+ return 0;
+ }
+ int cursor = (int) (position - buf.getOffset());
+ int availableLengthInBuffer = buf.getLength() - cursor;
+ int lengthToCopy = Math.min(length, availableLengthInBuffer);
+ System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+ if (cursor == 0) {
+ buf.setFirstByteConsumed(true);
+ }
+ if (cursor + lengthToCopy == buf.getLength()) {
+ buf.setLastByteConsumed(true);
+ }
+ buf.setAnyByteConsumed(true);
+ return lengthToCopy;
+ }
+
+ /*
+ *
+ * ReadBufferWorker-thread-facing methods
+ *
+ */
+
+ /**
+ * ReadBufferWorker thread calls this to get the next buffer that it should work on.
+ *
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
+ */
+ ReadBuffer getNextBlockToRead() throws InterruptedException {
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ //buffer = readAheadQueue.take(); // blocking method
+ while (readAheadQueue.size() == 0) {
+ wait();
+ }
+ buffer = readAheadQueue.remove();
+ notifyAll();
+ if (buffer == null) {
+ return null; // should never happen
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ inProgressList.add(buffer);
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+ buffer.getStream().getPath(), buffer.getOffset());
+ }
+ return buffer;
+ }
+
+ /**
+ * ReadBufferWorker thread calls this method to post completion.
+ *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
+ */
+ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+ buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
+ }
+ synchronized (this) {
+ inProgressList.remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setTimeStamp(currentTimeMillis());
+ buffer.setLength(bytesActuallyRead);
+ completedReadList.add(buffer);
+ } else {
+ freeList.push(buffer.getBufferindex());
+ // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
+ }
+ }
+ //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
+ }
+
+ /**
+ * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
+ * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
+ * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
+ * Note: it is not monotonic across Sockets, and even within a CPU, its only the
+ * more recent parts which share a clock across all cores.
+ *
+ * @return current time in milliseconds
+ */
+ private long currentTimeMillis() {
+ return System.nanoTime() / 1000 / 1000;
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java
new file mode 100644
index 000000000..d63674977
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+/**
+ * The ReadBufferStatus for Rest AbfsClient
+ */
+public enum ReadBufferStatus {
+ NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
+ READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
+ AVAILABLE, // data is available in buffer. It should be in completedList
+ READ_FAILED // read completed, but failed.
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java
new file mode 100644
index 000000000..6ffbc4644
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package seaweed.hdfs;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBufferWorker implements Runnable {
+
+ protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
+ private int id;
+
+ ReadBufferWorker(final int id) {
+ this.id = id;
+ }
+
+ /**
+ * return the ID of ReadBufferWorker.
+ */
+ public int getId() {
+ return this.id;
+ }
+
+ /**
+ * Waits until a buffer becomes available in ReadAheadQueue.
+ * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
+ * Rinse and repeat. Forever.
+ */
+ public void run() {
+ try {
+ UNLEASH_WORKERS.await();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ ReadBuffer buffer;
+ while (true) {
+ try {
+ buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (buffer != null) {
+ try {
+ // do the actual read, from the file.
+ int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
+ bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
+ } catch (Exception ex) {
+ bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
+ }
+ }
+ }
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
new file mode 100644
index 000000000..c12da8261
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -0,0 +1,620 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+
+public class SeaweedFileSystem extends FileSystem {
+
+ public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
+ public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
+ public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
+ private static int BUFFER_SIZE = 16 * 1024 * 1024;
+
+ private URI uri;
+ private Path workingDirectory = new Path("/");
+ private SeaweedFileSystemStore seaweedFileSystemStore;
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public String getScheme() {
+ return "seaweedfs";
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException { // get
+ super.initialize(uri, conf);
+
+ // get host information from uri (overrides info in conf)
+ String host = uri.getHost();
+ host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host;
+ if (host == null) {
+ throw new IOException("Invalid host specified");
+ }
+ conf.set(FS_SEAWEED_FILER_HOST, host);
+
+ // get port information from uri, (overrides info in conf)
+ int port = uri.getPort();
+ port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
+ conf.setInt(FS_SEAWEED_FILER_PORT, port);
+
+ conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
+
+ setConf(conf);
+ this.uri = uri;
+
+ seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
+
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+
+ LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
+
+ path = qualify(path);
+
+ try {
+ InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
+ return new FSDataInputStream(inputStream);
+ } catch (Exception ex) {
+ LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
+ return null;
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
+ final short replication, final long blockSize, final Progressable progress) throws IOException {
+
+ LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
+
+ path = qualify(path);
+
+ try {
+ String replicaPlacement = String.format("%03d", replication - 1);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
+ return new FSDataOutputStream(outputStream, statistics);
+ } catch (Exception ex) {
+ LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
+ return null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws FileNotFoundException if the parent directory is not present -or
+ * is not a directory.
+ */
+ @Override
+ public FSDataOutputStream createNonRecursive(Path path,
+ FsPermission permission,
+ EnumSet<CreateFlag> flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ Path parent = path.getParent();
+ if (parent != null) {
+ // expect this to raise an exception if there is no parent
+ if (!getFileStatus(parent).isDirectory()) {
+ throw new FileAlreadyExistsException("Not a directory: " + parent);
+ }
+ }
+ return create(path, permission,
+ flags.contains(CreateFlag.OVERWRITE), bufferSize,
+ replication, blockSize, progress);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
+
+ LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
+
+ path = qualify(path);
+ try {
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
+ return new FSDataOutputStream(outputStream, statistics);
+ } catch (Exception ex) {
+ LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
+ return null;
+ }
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) {
+
+ LOG.debug("rename path: {} => {}", src, dst);
+
+ if (src.isRoot()) {
+ return false;
+ }
+
+ if (src.equals(dst)) {
+ return true;
+ }
+ FileStatus dstFileStatus = getFileStatus(dst);
+
+ String sourceFileName = src.getName();
+ Path adjustedDst = dst;
+
+ if (dstFileStatus != null) {
+ if (!dstFileStatus.isDirectory()) {
+ return false;
+ }
+ adjustedDst = new Path(dst, sourceFileName);
+ }
+
+ Path qualifiedSrcPath = qualify(src);
+ Path qualifiedDstPath = qualify(adjustedDst);
+
+ seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath);
+ return true;
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) {
+
+ LOG.debug("delete path: {} recursive:{}", path, recursive);
+
+ path = qualify(path);
+
+ FileStatus fileStatus = getFileStatus(path);
+
+ if (fileStatus == null) {
+ return true;
+ }
+
+ return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
+
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+
+ LOG.debug("listStatus path: {}", path);
+
+ path = qualify(path);
+
+ return seaweedFileSystemStore.listEntries(path);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDirectory;
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ if (path.isAbsolute()) {
+ workingDirectory = path;
+ } else {
+ workingDirectory = new Path(workingDirectory, path);
+ }
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+
+ LOG.debug("mkdirs path: {}", path);
+
+ path = qualify(path);
+
+ FileStatus fileStatus = getFileStatus(path);
+
+ if (fileStatus == null) {
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ return seaweedFileSystemStore.createDirectory(path, currentUser,
+ fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
+ FsPermission.getUMask(getConf()));
+
+ }
+
+ if (fileStatus.isDirectory()) {
+ return true;
+ } else {
+ throw new FileAlreadyExistsException("Path is a file: " + path);
+ }
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) {
+
+ LOG.debug("getFileStatus path: {}", path);
+
+ path = qualify(path);
+
+ return seaweedFileSystemStore.getFileStatus(path);
+ }
+
+ /**
+ * Set owner of a path (i.e. a file or a directory).
+ * The parameters owner and group cannot both be null.
+ *
+ * @param path The path
+ * @param owner If it is null, the original username remains unchanged.
+ * @param group If it is null, the original groupname remains unchanged.
+ */
+ @Override
+ public void setOwner(Path path, final String owner, final String group)
+ throws IOException {
+ LOG.debug("setOwner path: {}", path);
+ path = qualify(path);
+
+ seaweedFileSystemStore.setOwner(path, owner, group);
+ }
+
+
+ /**
+ * Set permission of a path.
+ *
+ * @param path The path
+ * @param permission Access permission
+ */
+ @Override
+ public void setPermission(Path path, final FsPermission permission) throws IOException {
+ LOG.debug("setPermission path: {}", path);
+
+ if (permission == null) {
+ throw new IllegalArgumentException("The permission can't be null");
+ }
+
+ path = qualify(path);
+
+ seaweedFileSystemStore.setPermission(path, permission);
+ }
+
+ Path qualify(Path path) {
+ return path.makeQualified(uri, workingDirectory);
+ }
+
+ /**
+ * Concat existing files together.
+ *
+ * @param trg the path to the target destination.
+ * @param psrcs the paths to the sources to use for the concatenation.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ @Override
+ public void concat(final Path trg, final Path[] psrcs) throws IOException {
+ throw new UnsupportedOperationException("Not implemented by the " +
+ getClass().getSimpleName() + " FileSystem implementation");
+ }
+
+ /**
+ * Truncate the file in the indicated path to the indicated size.
+ * <ul>
+ * <li>Fails if path is a directory.</li>
+ * <li>Fails if path does not exist.</li>
+ * <li>Fails if path is not closed.</li>
+ * <li>Fails if new size is greater than current size.</li>
+ * </ul>
+ *
+ * @param f The path to the file to be truncated
+ * @param newLength The size the file is to be truncated to
+ * @return <code>true</code> if the file has been truncated to the desired
+ * <code>newLength</code> and is immediately available to be reused for
+ * write operations such as <code>append</code>, or
+ * <code>false</code> if a background process of adjusting the length of
+ * the last block has been started, and clients should wait for it to
+ * complete before proceeding with further file updates.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ */
+ @Override
+ public boolean truncate(Path f, long newLength) throws IOException {
+ throw new UnsupportedOperationException("Not implemented by the " +
+ getClass().getSimpleName() + " FileSystem implementation");
+ }
+
+ @Override
+ public void createSymlink(final Path target, final Path link,
+ final boolean createParent) throws AccessControlException,
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException {
+ // Supporting filesystems should override this method
+ throw new UnsupportedOperationException(
+ "Filesystem does not support symlinks!");
+ }
+
+ public boolean supportsSymlinks() {
+ return false;
+ }
+
+ /**
+ * Create a snapshot.
+ *
+ * @param path The directory where snapshots will be taken.
+ * @param snapshotName The name of the snapshot
+ * @return the snapshot path.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ */
+ @Override
+ public Path createSnapshot(Path path, String snapshotName)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support createSnapshot");
+ }
+
+ /**
+ * Rename a snapshot.
+ *
+ * @param path The directory path where the snapshot was taken
+ * @param snapshotOldName Old name of the snapshot
+ * @param snapshotNewName New name of the snapshot
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void renameSnapshot(Path path, String snapshotOldName,
+ String snapshotNewName) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support renameSnapshot");
+ }
+
+ /**
+ * Delete a snapshot of a directory.
+ *
+ * @param path The directory that the to-be-deleted snapshot belongs to
+ * @param snapshotName The name of the snapshot
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void deleteSnapshot(Path path, String snapshotName)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support deleteSnapshot");
+ }
+
+ /**
+ * Modifies ACL entries of files and directories. This method can add new ACL
+ * entries or modify the permissions on existing ACL entries. All existing
+ * ACL entries that are not specified in this call are retained without
+ * changes. (Modifications are merged into the current ACL.)
+ *
+ * @param path Path to modify
+ * @param aclSpec List&lt;AclEntry&gt; describing modifications
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support modifyAclEntries");
+ }
+
+ /**
+ * Removes ACL entries from files and directories. Other ACL entries are
+ * retained.
+ *
+ * @param path Path to modify
+ * @param aclSpec List describing entries to remove
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeAclEntries");
+ }
+
+ /**
+ * Removes all default ACL entries from files and directories.
+ *
+ * @param path Path to modify
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeDefaultAcl(Path path)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeDefaultAcl");
+ }
+
+ /**
+ * Removes all but the base ACL entries of files and directories. The entries
+ * for user, group, and others are retained for compatibility with permission
+ * bits.
+ *
+ * @param path Path to modify
+ * @throws IOException if an ACL could not be removed
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeAcl(Path path)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeAcl");
+ }
+
+ /**
+ * Fully replaces ACL of files and directories, discarding all existing
+ * entries.
+ *
+ * @param path Path to modify
+ * @param aclSpec List describing modifications, which must include entries
+ * for user, group, and others for compatibility with permission bits.
+ * @throws IOException if an ACL could not be modified
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support setAcl");
+ }
+
+ /**
+ * Gets the ACL of a file or directory.
+ *
+ * @param path Path to get
+ * @return AclStatus describing the ACL of the file or directory
+ * @throws IOException if an ACL could not be read
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public AclStatus getAclStatus(Path path) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getAclStatus");
+ }
+
+ /**
+ * Set an xattr of a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to modify
+ * @param name xattr name.
+ * @param value xattr value.
+ * @param flag xattr set flag
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void setXAttr(Path path, String name, byte[] value,
+ EnumSet<XAttrSetFlag> flag) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support setXAttr");
+ }
+
+ /**
+ * Get an xattr name and value for a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attribute
+ * @param name xattr name.
+ * @return byte[] xattr value.
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public byte[] getXAttr(Path path, String name) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttr");
+ }
+
+ /**
+ * Get all of the xattr name/value pairs for a file or directory.
+ * Only those xattrs which the logged-in user has permissions to view
+ * are returned.
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttrs");
+ }
+
+ /**
+ * Get all of the xattrs name/value pairs for a file or directory.
+ * Only those xattrs which the logged-in user has permissions to view
+ * are returned.
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @param names XAttr names.
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+ throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support getXAttrs");
+ }
+
+ /**
+ * Get all of the xattr names for a file or directory.
+ * Only those xattr names which the logged-in user has permissions to view
+ * are returned.
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to get extended attributes
+ * @return List{@literal <String>} of the XAttr names of the file or directory
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public List<String> listXAttrs(Path path) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support listXAttrs");
+ }
+
+ /**
+ * Remove an xattr of a file or directory.
+ * The name must be prefixed with the namespace followed by ".". For example,
+ * "user.attr".
+ * <p>
+ * Refer to the HDFS extended attributes user documentation for details.
+ *
+ * @param path Path to remove extended attribute
+ * @param name xattr name
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default outcome).
+ */
+ @Override
+ public void removeXAttr(Path path, String name) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + " doesn't support removeXAttr");
+ }
+
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
new file mode 100644
index 000000000..9617a38be
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -0,0 +1,282 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedRead;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class SeaweedFileSystemStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
+
+ private FilerGrpcClient filerGrpcClient;
+ private FilerClient filerClient;
+
+ public SeaweedFileSystemStore(String host, int port) {
+ int grpcPort = 10000 + port;
+ filerGrpcClient = new FilerGrpcClient(host, grpcPort);
+ filerClient = new FilerClient(filerGrpcClient);
+ }
+
+ public static String getParentDirectory(Path path) {
+ return path.isRoot() ? "/" : path.getParent().toUri().getPath();
+ }
+
+ static int permissionToMode(FsPermission permission, boolean isDirectory) {
+ int p = permission.toShort();
+ if (isDirectory) {
+ p = p | 1 << 31;
+ }
+ return p;
+ }
+
+ public boolean createDirectory(final Path path, UserGroupInformation currentUser,
+ final FsPermission permission, final FsPermission umask) {
+
+ LOG.debug("createDirectory path: {} permission: {} umask: {}",
+ path,
+ permission,
+ umask);
+
+ return filerClient.mkdirs(
+ path.toUri().getPath(),
+ permissionToMode(permission, true),
+ currentUser.getUserName(),
+ currentUser.getGroupNames()
+ );
+ }
+
+ public FileStatus[] listEntries(final Path path) {
+ LOG.debug("listEntries path: {}", path);
+
+ FileStatus pathStatus = getFileStatus(path);
+
+ if (pathStatus == null) {
+ return new FileStatus[0];
+ }
+
+ if (!pathStatus.isDirectory()) {
+ return new FileStatus[]{pathStatus};
+ }
+
+ List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
+
+ List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
+
+ for (FilerProto.Entry entry : entries) {
+
+ FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry);
+
+ fileStatuses.add(fileStatus);
+ }
+ LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size());
+ return fileStatuses.toArray(new FileStatus[0]);
+
+ }
+
+ public FileStatus getFileStatus(final Path path) {
+
+ FilerProto.Entry entry = lookupEntry(path);
+ if (entry == null) {
+ return null;
+ }
+ LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
+
+ FileStatus fileStatus = doGetFileStatus(path, entry);
+ return fileStatus;
+ }
+
+ public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
+ LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
+ path,
+ String.valueOf(isDirectory),
+ String.valueOf(recursive));
+
+ if (path.isRoot()) {
+ return true;
+ }
+
+ if (recursive && isDirectory) {
+ List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
+ for (FilerProto.Entry entry : entries) {
+ deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true);
+ }
+ }
+
+ return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive, true);
+ }
+
+ private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
+ FilerProto.FuseAttributes attributes = entry.getAttributes();
+ long length = SeaweedRead.totalSize(entry.getChunksList());
+ boolean isDir = entry.getIsDirectory();
+ int block_replication = 1;
+ int blocksize = 512;
+ long modification_time = attributes.getMtime() * 1000; // milliseconds
+ long access_time = 0;
+ FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
+ String owner = attributes.getUserName();
+ String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
+ return new FileStatus(length, isDir, block_replication, blocksize,
+ modification_time, access_time, permission, owner, group, null, path);
+ }
+
+ private FilerProto.Entry lookupEntry(Path path) {
+
+ return filerClient.lookupEntry(getParentDirectory(path), path.getName());
+
+ }
+
+ public void rename(Path source, Path destination) {
+
+ LOG.debug("rename source: {} destination:{}", source, destination);
+
+ if (source.isRoot()) {
+ return;
+ }
+ LOG.info("rename source: {} destination:{}", source, destination);
+ FilerProto.Entry entry = lookupEntry(source);
+ if (entry == null) {
+ LOG.warn("rename non-existing source: {}", source);
+ return;
+ }
+ filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
+ }
+
+ public OutputStream createFile(final Path path,
+ final boolean overwrite,
+ FsPermission permission,
+ int bufferSize,
+ String replication) throws IOException {
+
+ permission = permission == null ? FsPermission.getFileDefault() : permission;
+
+ LOG.debug("createFile path: {} overwrite: {} permission: {}",
+ path,
+ overwrite,
+ permission.toString());
+
+ UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
+ long now = System.currentTimeMillis() / 1000L;
+
+ FilerProto.Entry.Builder entry = null;
+ long writePosition = 0;
+ if (!overwrite) {
+ FilerProto.Entry existingEntry = lookupEntry(path);
+ LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry);
+ if (existingEntry != null) {
+ entry = FilerProto.Entry.newBuilder();
+ entry.mergeFrom(existingEntry);
+ entry.getAttributesBuilder().setMtime(now);
+ LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
+ writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
+ replication = existingEntry.getAttributes().getReplication();
+ }
+ }
+ if (entry == null) {
+ entry = FilerProto.Entry.newBuilder()
+ .setName(path.getName())
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(permissionToMode(permission, false))
+ .setReplication(replication)
+ .setCrtime(now)
+ .setMtime(now)
+ .setUserName(userGroupInformation.getUserName())
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
+ );
+ }
+
+ return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
+
+ }
+
+ public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
+ int bufferSize) throws IOException {
+
+ LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
+
+ int readAheadQueueDepth = 2;
+ FilerProto.Entry entry = lookupEntry(path);
+
+ if (entry == null) {
+ throw new FileNotFoundException("read non-exist file " + path);
+ }
+
+ return new SeaweedInputStream(filerGrpcClient,
+ statistics,
+ path.toUri().getPath(),
+ entry,
+ bufferSize,
+ readAheadQueueDepth);
+ }
+
+ public void setOwner(Path path, String owner, String group) {
+
+ LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group);
+
+ FilerProto.Entry entry = lookupEntry(path);
+ if (entry == null) {
+ LOG.debug("setOwner path:{} entry:{}", path, entry);
+ return;
+ }
+
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
+
+ if (owner != null) {
+ attributesBuilder.setUserName(owner);
+ }
+ if (group != null) {
+ attributesBuilder.clearGroupName();
+ attributesBuilder.addGroupName(group);
+ }
+
+ entryBuilder.setAttributes(attributesBuilder);
+
+ LOG.debug("setOwner path:{} entry:{}", path, entryBuilder);
+
+ filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
+
+ }
+
+ public void setPermission(Path path, FsPermission permission) {
+
+ LOG.debug("setPermission path:{} permission:{}", path, permission);
+
+ FilerProto.Entry entry = lookupEntry(path);
+ if (entry == null) {
+ LOG.debug("setPermission path:{} entry:{}", path, entry);
+ return;
+ }
+
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
+
+ attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory()));
+
+ entryBuilder.setAttributes(attributesBuilder);
+
+ LOG.debug("setPermission path:{} entry:{}", path, entryBuilder);
+
+ filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
+
+ }
+
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
new file mode 100644
index 000000000..90c14c772
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -0,0 +1,371 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedRead;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+public class SeaweedInputStream extends FSInputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+
+ private final FilerGrpcClient filerGrpcClient;
+ private final Statistics statistics;
+ private final String path;
+ private final FilerProto.Entry entry;
+ private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
+ private final long contentLength;
+ private final int bufferSize; // default buffer size
+ private final int readAheadQueueDepth; // initialized in constructor
+ private final boolean readAheadEnabled; // whether enable readAhead;
+
+ private byte[] buffer = null; // will be initialized on first use
+
+ private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
+ private long fCursorAfterLastRead = -1;
+ private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
+ private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
+ // of valid bytes in buffer)
+ private boolean closed = false;
+
+ public SeaweedInputStream(
+ final FilerGrpcClient filerGrpcClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry,
+ final int bufferSize,
+ final int readAheadQueueDepth) {
+ this.filerGrpcClient = filerGrpcClient;
+ this.statistics = statistics;
+ this.path = path;
+ this.entry = entry;
+ this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
+ this.bufferSize = bufferSize;
+ this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
+ this.readAheadEnabled = true;
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int numberOfBytesRead = read(b, 0, 1);
+ if (numberOfBytesRead < 0) {
+ return -1;
+ } else {
+ return (b[0] & 0xFF);
+ }
+ }
+
+ @Override
+ public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+ int currentOff = off;
+ int currentLen = len;
+ int lastReadBytes;
+ int totalReadBytes = 0;
+ do {
+ lastReadBytes = readOneBlock(b, currentOff, currentLen);
+ if (lastReadBytes > 0) {
+ currentOff += lastReadBytes;
+ currentLen -= lastReadBytes;
+ totalReadBytes += lastReadBytes;
+ }
+ if (currentLen <= 0 || currentLen > b.length - currentOff) {
+ break;
+ }
+ } while (lastReadBytes > 0);
+ return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
+ }
+
+ private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ Preconditions.checkNotNull(b);
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (this.available() == 0) {
+ return -1;
+ }
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ //If buffer is empty, then fill the buffer.
+ if (bCursor == limit) {
+ //If EOF, then return -1
+ if (fCursor >= contentLength) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ //reset buffer to initial state - i.e., throw away existing data
+ bCursor = 0;
+ limit = 0;
+ if (buffer == null) {
+ buffer = new byte[bufferSize];
+ }
+
+ // Enable readAhead when reading sequentially
+ if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
+ bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
+ } else {
+ bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
+ }
+
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ limit += bytesRead;
+ fCursor += bytesRead;
+ fCursorAfterLastRead = fCursor;
+ }
+
+ //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
+ //(bytes returned may be less than requested)
+ int bytesRemaining = limit - bCursor;
+ int bytesToRead = Math.min(len, bytesRemaining);
+ System.arraycopy(buffer, bCursor, b, off, bytesToRead);
+ bCursor += bytesToRead;
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesToRead);
+ }
+ return bytesToRead;
+ }
+
+
+ private int readInternal(final long position, final byte[] b, final int offset, final int length,
+ final boolean bypassReadAhead) throws IOException {
+ if (readAheadEnabled && !bypassReadAhead) {
+ // try reading from read-ahead
+ if (offset != 0) {
+ throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
+ }
+ int receivedBytes;
+
+ // queue read-aheads
+ int numReadAheads = this.readAheadQueueDepth;
+ long nextSize;
+ long nextOffset = position;
+ while (numReadAheads > 0 && nextOffset < contentLength) {
+ nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+ ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
+ nextOffset = nextOffset + nextSize;
+ numReadAheads--;
+ }
+
+ // try reading from buffers first
+ receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
+ if (receivedBytes > 0) {
+ return receivedBytes;
+ }
+
+ // got nothing from read-ahead, do our own read now
+ receivedBytes = readRemote(position, b, offset, length);
+ return receivedBytes;
+ } else {
+ return readRemote(position, b, offset, length);
+ }
+ }
+
+ int readRemote(long position, byte[] b, int offset, int length) throws IOException {
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+ if (b == null) {
+ throw new IllegalArgumentException("null byte array passed in to read() method");
+ }
+ if (offset >= b.length) {
+ throw new IllegalArgumentException("offset greater than length of array");
+ }
+ if (length < 0) {
+ throw new IllegalArgumentException("requested read length is less than zero");
+ }
+ if (length > (b.length - offset)) {
+ throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ }
+
+ long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
+ if (bytesRead > Integer.MAX_VALUE) {
+ throw new IOException("Unexpected Content-Length");
+ }
+ return (int) bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ if (n < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (n > contentLength) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+
+ if (n >= fCursor - limit && n <= fCursor) { // within buffer
+ bCursor = (int) (n - (fCursor - limit));
+ return;
+ }
+
+ // next read will read from here
+ fCursor = n;
+
+ //invalidate buffer
+ limit = 0;
+ bCursor = 0;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ long currentPos = getPos();
+ if (currentPos == contentLength) {
+ if (n > 0) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ }
+ long newPos = currentPos + n;
+ if (newPos < 0) {
+ newPos = 0;
+ n = newPos - currentPos;
+ }
+ if (newPos > contentLength) {
+ newPos = contentLength;
+ n = newPos - currentPos;
+ }
+ seek(newPos);
+ return n;
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw new IOException(
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ final long remaining = this.contentLength - this.getPos();
+ return remaining <= Integer.MAX_VALUE
+ ? (int) remaining : Integer.MAX_VALUE;
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ return contentLength;
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ return fCursor - limit + bCursor;
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ buffer = null; // de-reference the buffer so it can be GC'ed sooner
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 4f307ff96..96af27fe0 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -78,9 +78,6 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
-
- LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry);
-
try {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
} catch (Exception ex) {