aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/pom.xml24
-rw-r--r--other/java/client/pom.xml.deploy170
-rw-r--r--other/java/client/pom_debug.xml144
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java42
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java36
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java140
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java170
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java84
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/Gzip.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java55
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java208
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java328
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java300
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java30
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java121
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java36
-rw-r--r--other/java/client/src/main/proto/filer.proto152
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java42
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java5
19 files changed, 1940 insertions, 188 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 0c585a941..f4e522a3e 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.4</version>
+ <version>1.6.4</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -16,7 +17,7 @@
<protobuf.version>3.9.1</protobuf.version>
<!-- follow https://github.com/grpc/grpc-java -->
<grpc.version>1.23.0</grpc.version>
- <guava.version>28.0-jre</guava.version>
+ <guava.version>30.0-jre</guava.version>
</properties>
<dependencies>
@@ -64,9 +65,14 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.12</version>
+ <version>4.13.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
</dependencies>
<distributionManagement>
@@ -88,8 +94,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -97,9 +103,11 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
- <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
<pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
</configuration>
<executions>
<execution>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
new file mode 100644
index 000000000..9c8c4f45e
--- /dev/null
+++ b/other/java/client/pom.xml.deploy
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.6.4</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
new file mode 100644
index 000000000..12ea860c2
--- /dev/null
+++ b/other/java/client/pom_debug.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.6.4</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
new file mode 100644
index 000000000..51053becd
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -0,0 +1,42 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ByteBufferPool {
+
+ private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024;
+ private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
+
+ private static final List<ByteBuffer> bufferList = new ArrayList<>();
+
+ public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ bufferSize = MIN_BUFFER_SIZE;
+ }
+ LOG.debug("requested new buffer {}", bufferSize);
+ if (bufferList.isEmpty()) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+ ByteBuffer buffer = bufferList.remove(bufferList.size() - 1);
+ if (buffer.capacity() >= bufferSize) {
+ return buffer;
+ }
+
+ LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize);
+ bufferList.add(0, buffer);
+ return ByteBuffer.allocate(bufferSize);
+
+ }
+
+ public static synchronized void release(ByteBuffer obj) {
+ ((Buffer)obj).clear();
+ bufferList.add(0, obj);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
new file mode 100644
index 000000000..58870d742
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -0,0 +1,36 @@
+package seaweedfs.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.concurrent.TimeUnit;
+
+public class ChunkCache {
+
+ private Cache<String, byte[]> cache = null;
+
+ public ChunkCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(maxEntries)
+ .expireAfterAccess(1, TimeUnit.HOURS)
+ .build();
+ }
+
+ public byte[] getChunk(String fileId) {
+ if (this.cache == null) {
+ return null;
+ }
+ return this.cache.getIfPresent(fileId);
+ }
+
+ public void setChunk(String fileId, byte[] data) {
+ if (this.cache == null) {
+ return;
+ }
+ this.cache.put(fileId, data);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
new file mode 100644
index 000000000..9b6ba5dfc
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -0,0 +1,140 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileChunkManifest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class);
+
+ private static final int mergeFactor = 1000;
+
+ public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) {
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (chunk.getIsChunkManifest()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<FilerProto.FileChunk> resolveChunkManifest(
+ final FilerClient filerClient, List<FilerProto.FileChunk> chunks) throws IOException {
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ continue;
+ }
+
+ // IsChunkManifest
+ LOG.debug("fetching chunk manifest:{}", chunk);
+ byte[] data = fetchChunk(filerClient, chunk);
+ FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
+ List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk t : m.getChunksList()) {
+ // avoid deprecated chunk.getFileId()
+ resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
+ }
+ dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks));
+ }
+
+ return dataChunks;
+ }
+
+ private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException {
+
+ String vid = "" + chunk.getFid().getVolumeId();
+ FilerProto.Locations locations = filerClient.vidLocations.get(vid);
+ if (locations == null) {
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ lookupRequest.addVolumeIds(vid);
+ FilerProto.LookupVolumeResponse lookupResponse = filerClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ locations = lookupResponse.getLocationsMapMap().get(vid);
+ filerClient.vidLocations.put(vid, locations);
+ LOG.debug("fetchChunk vid:{} locations:{}", vid, locations);
+ }
+
+ SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
+ FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
+ 0,
+ -1,
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed());
+
+ byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
+ if (chunkData == null) {
+ LOG.debug("doFetchFullChunkData:{}", chunkView);
+ chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations);
+ }
+ if (chunk.getIsChunkManifest()){
+ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
+ SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+ }
+
+ return chunkData;
+
+ }
+
+ public static List<FilerProto.FileChunk> maybeManifestize(
+ final FilerClient filerClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException {
+ // the return variable
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : inputChunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ } else {
+ chunks.add(chunk);
+ }
+ }
+
+ int remaining = dataChunks.size();
+ for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
+ FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory);
+ chunks.add(chunk);
+ remaining -= mergeFactor;
+ }
+
+ // remaining
+ for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) {
+ chunks.add(dataChunks.get(i));
+ }
+ return chunks;
+ }
+
+ private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException {
+ // create and serialize the manifest
+ dataChunks = FilerClient.beforeEntrySerialization(dataChunks);
+ FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
+ byte[] data = m.build().toByteArray();
+
+ long minOffset = Long.MAX_VALUE;
+ long maxOffset = -1;
+ for (FilerProto.FileChunk chunk : dataChunks) {
+ minOffset = Math.min(minOffset, chunk.getOffset());
+ maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset());
+ }
+
+ FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
+ filerClient.getReplication(),
+ filerClient,
+ minOffset,
+ data, 0, data.length, parentDirectory);
+ manifestChunk.setIsChunkManifest(true);
+ manifestChunk.setSize(maxOffset - minOffset);
+ return manifestChunk.build();
+
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 84aa26ad9..257a9873d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -1,27 +1,82 @@
package seaweedfs.client;
+import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-public class FilerClient {
+public class FilerClient extends FilerGrpcClient {
private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
- private FilerGrpcClient filerGrpcClient;
-
public FilerClient(String host, int grpcPort) {
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
+ super(host, grpcPort);
+ }
+
+ public static String toFileId(FilerProto.FileId fid) {
+ if (fid == null) {
+ return null;
+ }
+ return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ }
+
+ public static FilerProto.FileId toFileIdObject(String fileIdStr) {
+ if (fileIdStr == null || fileIdStr.length() == 0) {
+ return null;
+ }
+ int commaIndex = fileIdStr.lastIndexOf(',');
+ String volumeIdStr = fileIdStr.substring(0, commaIndex);
+ String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8);
+ String cookieStr = fileIdStr.substring(fileIdStr.length() - 8);
+
+ return FilerProto.FileId.newBuilder()
+ .setVolumeId(Integer.parseInt(volumeIdStr))
+ .setFileKey(Long.parseLong(fileKeyStr, 16))
+ .setCookie((int) Long.parseLong(cookieStr, 16))
+ .build();
+ }
+
+ public static List<FilerProto.FileChunk> beforeEntrySerialization(List<FilerProto.FileChunk> chunks) {
+ List<FilerProto.FileChunk> cleanedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : chunks) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.clearFileId();
+ chunkBuilder.clearSourceFileId();
+ chunkBuilder.setFid(toFileIdObject(chunk.getFileId()));
+ FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId());
+ if (sourceFid != null) {
+ chunkBuilder.setSourceFid(sourceFid);
+ }
+ cleanedChunks.add(chunkBuilder.build());
+ }
+ return cleanedChunks;
}
- public FilerClient(FilerGrpcClient filerGrpcClient) {
- this.filerGrpcClient = filerGrpcClient;
+ public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) {
+ if (entry.getChunksList().size() <= 0) {
+ return entry;
+ }
+ String fileId = entry.getChunks(0).getFileId();
+ if (fileId != null && fileId.length() != 0) {
+ return entry;
+ }
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ entryBuilder.clearChunks();
+ for (FilerProto.FileChunk chunk : entry.getChunksList()) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.setFileId(toFileId(chunk.getFid()));
+ String sourceFileId = toFileId(chunk.getSourceFid());
+ if (sourceFileId != null) {
+ chunkBuilder.setSourceFileId(sourceFileId);
+ }
+ entryBuilder.addChunks(chunkBuilder);
+ }
+ return entryBuilder.build();
}
public boolean mkdirs(String path, int mode) {
@@ -38,9 +93,9 @@ public class FilerClient {
if ("/".equals(path)) {
return true;
}
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent().replace('\\','/');
+ String name = pathFile.getName();
mkdirs(parent, mode, uid, gid, userName, groupNames);
@@ -59,13 +114,13 @@ public class FilerClient {
public boolean mv(String oldPath, String newPath) {
- Path oldPathObject = Paths.get(oldPath);
- String oldParent = oldPathObject.getParent().toString();
- String oldName = oldPathObject.getFileName().toString();
+ File oldPathFile = new File(oldPath);
+ String oldParent = oldPathFile.getParent().replace('\\','/');
+ String oldName = oldPathFile.getName();
- Path newPathObject = Paths.get(newPath);
- String newParent = newPathObject.getParent().toString();
- String newName = newPathObject.getFileName().toString();
+ File newPathFile = new File(newPath);
+ String newParent = newPathFile.getParent().replace('\\','/');
+ String newName = newPathFile.getName();
return atomicRenameEntry(oldParent, oldName, newParent, newName);
@@ -73,9 +128,9 @@ public class FilerClient {
public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) {
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent().replace('\\','/');
+ String name = pathFile.getName();
return deleteEntry(
parent,
@@ -92,9 +147,9 @@ public class FilerClient {
public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) {
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent().replace('\\','/');
+ String name = pathFile.getName();
FilerProto.Entry entry = lookupEntry(parent, name);
if (entry == null) {
@@ -156,7 +211,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) {
- List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024);
+ List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) {
break;
}
@@ -173,31 +228,35 @@ public class FilerClient {
return results;
}
- public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
- Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
+ Iterator<FilerProto.ListEntriesResponse> iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
+ .setInclusiveStartFrom(includeLastEntry)
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
- while (iter.hasNext()){
+ while (iter.hasNext()) {
FilerProto.ListEntriesResponse resp = iter.next();
- entries.add(fixEntryAfterReading(resp.getEntry()));
+ entries.add(afterEntryDeserialization(resp.getEntry()));
}
return entries;
}
public FilerProto.Entry lookupEntry(String directory, String entryName) {
try {
- FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
+ FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry(
FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(directory)
.setName(entryName)
.build()).getEntry();
- return fixEntryAfterReading(entry);
+ if (entry == null) {
+ return null;
+ }
+ return afterEntryDeserialization(entry);
} catch (Exception e) {
- if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){
+ if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
return null;
}
LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);
@@ -205,28 +264,32 @@ public class FilerClient {
}
}
-
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
- filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ FilerProto.CreateEntryResponse createEntryResponse =
+ this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
+ if (Strings.isNullOrEmpty(createEntryResponse.getError())) {
+ return true;
+ }
+ LOG.warn("createEntry {}/{} error: {}", parent, entry.getName(), createEntryResponse.getError());
+ return false;
} catch (Exception e) {
LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
return false;
}
- return true;
}
public boolean updateEntry(String parent, FilerProto.Entry entry) {
try {
- filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
+ this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
.setDirectory(parent)
.setEntry(entry)
.build());
} catch (Exception e) {
- LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
+ LOG.warn("updateEntry {}/{}: {}", parent, entry.getName(), e);
return false;
}
return true;
@@ -234,7 +297,7 @@ public class FilerClient {
public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) {
try {
- filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
+ this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(parent)
.setName(entryName)
.setIsDeleteData(isDeleteFileChunk)
@@ -250,7 +313,7 @@ public class FilerClient {
public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) {
try {
- filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
+ this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
.setOldDirectory(oldParent)
.setOldName(oldName)
.setNewDirectory(newParent)
@@ -263,24 +326,13 @@ public class FilerClient {
return true;
}
- private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) {
- if (entry.getChunksList().size() <= 0) {
- return entry;
- }
- String fileId = entry.getChunks(0).getFileId();
- if (fileId != null && fileId.length() != 0) {
- return entry;
- }
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- entryBuilder.clearChunks();
- for (FilerProto.FileChunk chunk : entry.getChunksList()) {
- FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
- FilerProto.FileId fid = chunk.getFid();
- fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
- chunkBuilder.setFileId(fileId);
- entryBuilder.addChunks(chunkBuilder);
- }
- return entryBuilder.build();
+ public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) {
+ return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
+ .setPathPrefix(prefix)
+ .setClientName(clientName)
+ .setSinceNs(sinceNs)
+ .build()
+ );
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 3626c76de..6c57e2e0d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -9,17 +9,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
-
- private final ManagedChannel channel;
- private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
- private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
- private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
-
static SslContext sslContext;
static {
@@ -30,6 +26,20 @@ public class FilerGrpcClient {
}
}
+ public final int VOLUME_SERVER_ACCESS_DIRECT = 0;
+ public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1;
+ public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
+ public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
+ private final ManagedChannel channel;
+ private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
+ private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
+ private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
+ private boolean cipher = false;
+ private String collection = "";
+ private String replication = "";
+ private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ private String filerAddress;
+
public FilerGrpcClient(String host, int grpcPort) {
this(host, grpcPort, sslContext);
}
@@ -37,20 +47,43 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
+ ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()
+ .maxInboundMessageSize(1024 * 1024 * 1024) :
NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
+ filerAddress = String.format("%s:%d", host, grpcPort - 10000);
+
+ FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
+ this.getBlockingStub().getFilerConfiguration(
+ FilerProto.GetFilerConfigurationRequest.newBuilder().build());
+ cipher = filerConfigurationResponse.getCipher();
+ collection = filerConfigurationResponse.getCollection();
+ replication = filerConfigurationResponse.getReplication();
+
}
- public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
+ private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
asyncStub = SeaweedFilerGrpc.newStub(channel);
futureStub = SeaweedFilerGrpc.newFutureStub(channel);
}
+ public boolean isCipher() {
+ return cipher;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public String getReplication() {
+ return replication;
+ }
+
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
@@ -67,4 +100,39 @@ public class FilerGrpcClient {
return futureStub;
}
+ public void setAccessVolumeServerDirectly() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ }
+
+ public boolean isAccessVolumeServerDirectly() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT;
+ }
+
+ public void setAccessVolumeServerByPublicUrl() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+
+ public boolean isAccessVolumeServerByPublicUrl() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+
+ public void setAccessVolumeServerByFilerProxy() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+
+ public boolean isAccessVolumeServerByFilerProxy() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+
+ public String getChunkUrl(String chunkId, String url, String publicUrl) {
+ switch (this.volumeServerAccess) {
+ case VOLUME_SERVER_ACCESS_PUBLIC_URL:
+ return String.format("http://%s/%s", publicUrl, chunkId);
+ case VOLUME_SERVER_ACCESS_FILER_PROXY:
+ return String.format("http://%s/?proxyChunkId=%s", this.filerAddress, chunkId);
+ default:
+ return String.format("http://%s/%s", url, chunkId);
+ }
+ }
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
new file mode 100644
index 000000000..4909094f5
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
@@ -0,0 +1,41 @@
+package seaweedfs.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class Gzip {
+ public static byte[] compress(byte[] data) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
+ GZIPOutputStream gzip = new GZIPOutputStream(bos);
+ gzip.write(data);
+ gzip.close();
+ byte[] compressed = bos.toByteArray();
+ bos.close();
+ return compressed;
+ }
+
+ public static byte[] decompress(byte[] compressed) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ return readAll(gis);
+ } catch (Exception e) {
+ return compressed;
+ }
+ }
+
+ private static byte[] readAll(InputStream input) throws IOException {
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[4096];
+ int n;
+ while (-1 != (n = input.read(buffer))) {
+ output.write(buffer, 0, n);
+ }
+ return output.toByteArray();
+ }
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
new file mode 100644
index 000000000..8d0ebd755
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java
@@ -0,0 +1,55 @@
+package seaweedfs.client;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.security.SecureRandom;
+
+public class SeaweedCipher {
+ // AES-GCM parameters
+ public static final int AES_KEY_SIZE = 256; // in bits
+ public static final int GCM_NONCE_LENGTH = 12; // in bytes
+ public static final int GCM_TAG_LENGTH = 16; // in bytes
+
+ private static SecureRandom random = new SecureRandom();
+
+ public static byte[] genCipherKey() throws Exception {
+ byte[] key = new byte[AES_KEY_SIZE / 8];
+ random.nextBytes(key);
+ return key;
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception {
+ return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey);
+ }
+
+ public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception {
+
+ final byte[] nonce = new byte[GCM_NONCE_LENGTH];
+ random.nextBytes(nonce);
+ GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+
+ Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec);
+
+ byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length);
+
+ byte[] iv = AES_cipherInstance.getIV();
+ byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH];
+ System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length);
+
+ return message;
+ }
+
+ public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception {
+ final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding");
+ GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH);
+ SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES");
+ AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params);
+ byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH);
+ return decryptedText;
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
new file mode 100644
index 000000000..4e40ce1b6
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -0,0 +1,208 @@
+package seaweedfs.client;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SeaweedInputStream extends InputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+ private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!");
+
+ private final FilerClient filerClient;
+ private final String path;
+ private final FilerProto.Entry entry;
+ private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
+ private final long contentLength;
+
+ private long position = 0; // cursor of the file
+
+ private boolean closed = false;
+
+ public SeaweedInputStream(
+ final FilerClient filerClient,
+ final String fullpath) throws IOException {
+ this.path = fullpath;
+ this.filerClient = filerClient;
+ this.entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(fullpath),
+ SeaweedOutputStream.getFileName(fullpath));
+ this.contentLength = SeaweedRead.fileSize(entry);
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
+ }
+
+ public SeaweedInputStream(
+ final FilerClient filerClient,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.filerClient = filerClient;
+ this.path = path;
+ this.entry = entry;
+ this.contentLength = SeaweedRead.fileSize(entry);
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int numberOfBytesRead = read(b, 0, 1);
+ if (numberOfBytesRead < 0) {
+ return -1;
+ } else {
+ return (b[0] & 0xFF);
+ }
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+
+ if (b == null) {
+ throw new IllegalArgumentException("null byte array passed in to read() method");
+ }
+ if (off >= b.length) {
+ throw new IllegalArgumentException("offset greater than length of array");
+ }
+ if (len < 0) {
+ throw new IllegalArgumentException("requested read length is less than zero");
+ }
+ if (len > (b.length - off)) {
+ throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ }
+
+ ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+ return read(buf);
+
+ }
+
+ // implement ByteBufferReadable
+ public synchronized int read(ByteBuffer buf) throws IOException {
+
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+
+ long bytesRead = 0;
+ int len = buf.remaining();
+ int start = (int) this.position;
+ if (start+len <= entry.getContent().size()) {
+ entry.getContent().substring(start, start+len).copyTo(buf);
+ } else {
+ bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
+ }
+
+ if (bytesRead > Integer.MAX_VALUE) {
+ throw new IOException("Unexpected Content-Length");
+ }
+
+ if (bytesRead > 0) {
+ this.position += bytesRead;
+ }
+
+ return (int) bytesRead;
+ }
+
+ public synchronized void seek(long n) throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ if (n < 0) {
+ throw new EOFException("Cannot seek to a negative offset");
+ }
+ if (n > contentLength) {
+ throw new EOFException("Attempted to seek or read past the end of the file");
+ }
+ this.position = n;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ if (this.position == contentLength) {
+ if (n > 0) {
+ throw new EOFException("Attempted to seek or read past the end of the file");
+ }
+ }
+ long newPos = this.position + n;
+ if (newPos < 0) {
+ newPos = 0;
+ n = newPos - this.position;
+ }
+ if (newPos > contentLength) {
+ newPos = contentLength;
+ n = newPos - this.position;
+ }
+ seek(newPos);
+ return n;
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ final long remaining = this.contentLength - this.position;
+ return remaining <= Integer.MAX_VALUE
+ ? (int) remaining : Integer.MAX_VALUE;
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ return contentLength;
+ }
+
+ public synchronized long getPos() throws IOException {
+ if (closed) {
+ throw EXCEPTION_STREAM_IS_CLOSED;
+ }
+ return position;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
new file mode 100644
index 000000000..ba298a713
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -0,0 +1,328 @@
+package seaweedfs.client;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+
+public class SeaweedOutputStream extends OutputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
+ protected final boolean supportFlush = true;
+ private final FilerClient filerClient;
+ private final String path;
+ private final int bufferSize;
+ private final int maxConcurrentRequestCount;
+ private final ThreadPoolExecutor threadExecutor;
+ private final ExecutorCompletionService<Void> completionService;
+ private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private final boolean shouldSaveMetadata = false;
+ private FilerProto.Entry.Builder entry;
+ private long position;
+ private boolean closed;
+ private volatile IOException lastError;
+ private long lastFlushOffset;
+ private long lastTotalAppendOffset = 0;
+ private ByteBuffer buffer;
+ private long outputIndex;
+ private String replication = "000";
+
+ public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
+ this(filerClient, fullpath, "000");
+ }
+
+ public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) {
+ this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000");
+ }
+
+ public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ this.filerClient = filerClient;
+ this.replication = replication;
+ this.path = path;
+ this.position = position;
+ this.closed = false;
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = bufferSize;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+
+ this.entry = entry;
+ if (this.entry == null) {
+ long now = System.currentTimeMillis() / 1000L;
+
+ this.entry = FilerProto.Entry.newBuilder()
+ .setName(getFileName(path))
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(0755)
+ .setReplication(replication)
+ .setCrtime(now)
+ .setMtime(now)
+ .clearGroupName()
+ );
+ }
+
+ }
+
+ public static String getParentDirectory(String path) {
+ int protoIndex = path.indexOf("://");
+ if (protoIndex >= 0) {
+ int pathStart = path.indexOf("/", protoIndex+3);
+ path = path.substring(pathStart);
+ }
+ if (path.equals("/")) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ if (lastSlashIndex == 0) {
+ return "/";
+ }
+ return path.substring(0, lastSlashIndex);
+ }
+
+ public static String getFileName(String path) {
+ if (path.indexOf("/") < 0) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ return path.substring(lastSlashIndex + 1);
+ }
+
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
+ try {
+ SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ this.lastFlushOffset = offset;
+ }
+
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[]{(byte) (byteVal & 0xFF)});
+ }
+
+ @Override
+ public synchronized void write(final byte[] data, final int off, final int length)
+ throws IOException {
+ maybeThrowLastError();
+
+ if (data == null) {
+ return;
+ }
+
+ if (off < 0 || length < 0 || length > data.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
+ int currentOffset = off;
+ int writableBytes = bufferSize - buffer.position();
+ int numberOfBytesToWrite = length;
+
+ while (numberOfBytesToWrite > 0) {
+
+ if (numberOfBytesToWrite < writableBytes) {
+ buffer.put(data, currentOffset, numberOfBytesToWrite);
+ outputIndex += numberOfBytesToWrite;
+ break;
+ }
+
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
+ buffer.put(data, currentOffset, writableBytes);
+ outputIndex += writableBytes;
+ currentOffset += writableBytes;
+ writeCurrentBufferToService();
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ writableBytes = bufferSize - buffer.position();
+ }
+
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the payload it is committed to the
+ * service. Data is queued for writing and forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
+ if (supportFlush) {
+ flushInternalAsync();
+ }
+ }
+
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete. Close the access to the stream and
+ * shutdown the upload thread pool.
+ * If the blob was created, its lease will be released.
+ * Any error encountered caught in threads and stored will be rethrown here
+ * after cleanup.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ LOG.debug("close path: {}", path);
+ try {
+ flushInternal();
+ threadExecutor.shutdown();
+ } finally {
+ lastError = new IOException("Stream is closed!");
+ ByteBufferPool.release(buffer);
+ buffer = null;
+ outputIndex = 0;
+ closed = true;
+ writeOperations.clear();
+ if (!threadExecutor.isShutdown()) {
+ threadExecutor.shutdownNow();
+ }
+ }
+
+ }
+
+ private synchronized void writeCurrentBufferToService() throws IOException {
+ if (buffer.position() == 0) {
+ return;
+ }
+
+ position += submitWriteBufferToService(buffer, position);
+
+ buffer = ByteBufferPool.request(bufferSize);
+
+ }
+
+ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
+
+ ((Buffer)bufferToWrite).flip();
+ int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
+
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
+ waitForTaskToComplete();
+ }
+ final Future<Void> job = completionService.submit(() -> {
+ // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
+ // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ ByteBufferPool.release(bufferToWrite);
+ return null;
+ });
+
+ writeOperations.add(new WriteOperation(job, writePosition, bytesLength));
+
+ // Try to shrink the queue
+ shrinkWriteOperationQueue();
+
+ return bytesLength;
+
+ }
+
+ private void waitForTaskToComplete() throws IOException {
+ boolean completed;
+ for (completed = false; completionService.poll() != null; completed = true) {
+ // keep polling until there is no data
+ }
+
+ if (!completed) {
+ try {
+ completionService.take();
+ } catch (InterruptedException e) {
+ lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
+ throw lastError;
+ }
+ }
+ }
+
+ private void maybeThrowLastError() throws IOException {
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Try to remove the completed write operations from the beginning of write
+ * operation FIFO queue.
+ */
+ private synchronized void shrinkWriteOperationQueue() throws IOException {
+ try {
+ while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
+ writeOperations.peek().task.get();
+ lastTotalAppendOffset += writeOperations.peek().length;
+ writeOperations.remove();
+ }
+ } catch (Exception e) {
+ lastError = new IOException(e);
+ throw lastError;
+ }
+ }
+
+ protected synchronized void flushInternal() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToService();
+ }
+
+ protected synchronized void flushInternalAsync() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToServiceAsync();
+ }
+
+ private synchronized void flushWrittenBytesToService() throws IOException {
+ for (WriteOperation writeOperation : writeOperations) {
+ try {
+ writeOperation.task.get();
+ } catch (Exception ex) {
+ lastError = new IOException(ex);
+ throw lastError;
+ }
+ }
+ LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
+ flushWrittenBytesToServiceInternal(position);
+ }
+
+ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
+ shrinkWriteOperationQueue();
+
+ if (this.lastTotalAppendOffset > this.lastFlushOffset) {
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
+ }
+ }
+
+ private static class WriteOperation {
+ private final Future<Void> task;
+ private final long startOffset;
+ private final long length;
+
+ WriteOperation(final Future<Void> task, final long startOffset, final long length) {
+ this.task = task;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 2efa64580..384636601 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -1,88 +1,200 @@
package seaweedfs.client;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.GzipDecompressingEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.*;
public class SeaweedRead {
- // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+
+ static ChunkCache chunkCache = new ChunkCache(4);
+ static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024);
// returns bytesRead
- public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
- final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength) throws IOException {
+ public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals,
+ final long position, final ByteBuffer buf, final long fileSize) throws IOException {
+
+ List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
- List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
+ Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
for (ChunkView chunkView : chunkViews) {
String vid = parseVolumeId(chunkView.fileId);
- lookupRequest.addVolumeIds(vid);
+ FilerProto.Locations locations = volumeIdCache.getLocations(vid);
+ if (locations == null) {
+ lookupRequest.addVolumeIds(vid);
+ } else {
+ knownLocations.put(vid, locations);
+ }
}
- FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
- .getBlockingStub().lookupVolume(lookupRequest.build());
-
- Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ if (lookupRequest.getVolumeIdsCount() > 0) {
+ FilerProto.LookupVolumeResponse lookupResponse = filerClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) {
+ volumeIdCache.setLocations(entry.getKey(), entry.getValue());
+ knownLocations.put(entry.getKey(), entry.getValue());
+ }
+ }
//TODO parallel this
long readCount = 0;
- int startOffset = bufferOffset;
+ long startOffset = position;
for (ChunkView chunkView : chunkViews) {
- FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
- if (locations.getLocationsCount() == 0) {
+
+ if (startOffset < chunkView.logicOffset) {
+ long gap = chunkView.logicOffset - startOffset;
+ LOG.debug("zero [{},{})", startOffset, startOffset + gap);
+ buf.position(buf.position()+ (int)gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
+ FilerProto.Locations locations = knownLocations.get(parseVolumeId(chunkView.fileId));
+ if (locations == null || locations.getLocationsCount() == 0) {
+ LOG.error("failed to locate {}", chunkView.fileId);
// log here!
return 0;
}
- int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+ int len = readChunkView(filerClient, startOffset, buf, chunkView, locations);
+
+ LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
readCount += len;
startOffset += len;
}
+ long limit = Math.min(buf.limit(), fileSize);
+
+ if (startOffset < limit) {
+ long gap = limit - startOffset;
+ LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
+ buf.position(buf.position()+ (int)gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
return readCount;
}
- private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
- HttpClient client = new DefaultHttpClient();
- HttpGet request = new HttpGet(
- String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+ private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
+
+ if (chunkData == null) {
+ chunkData = doFetchFullChunkData(filerClient, chunkView, locations);
+ chunkCache.setChunk(chunkView.fileId, chunkData);
+ }
+
+ int len = (int) chunkView.size;
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
+ buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
+
+ return len;
+ }
+
+ public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ byte[] data = null;
+ IOException lastException = null;
+ for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
+ for (FilerProto.Location location : locations.getLocationsList()) {
+ String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl());
+ try {
+ data = doFetchOneFullChunkData(chunkView, url);
+ lastException = null;
+ break;
+ } catch (IOException ioe) {
+ LOG.debug("doFetchFullChunkData {} :{}", url, ioe);
+ lastException = ioe;
+ }
+ }
+ if (data != null) {
+ break;
+ }
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ }
+ }
- if (!chunkView.isFullChunk) {
- request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
- request.setHeader(HttpHeaders.RANGE,
- String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size));
+ if (lastException != null) {
+ throw lastException;
}
+ LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
+
+ return data;
+
+ }
+
+ private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
+
+ HttpGet request = new HttpGet(url);
+
+ request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
+
+ byte[] data = null;
+
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
+
try {
- HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
- int len = (int) (chunkView.logicOffset - position + chunkView.size);
- OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len));
- entity.writeTo(outputStream);
- // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len);
+ Header contentEncodingHeader = entity.getContentEncoding();
+
+ if (contentEncodingHeader != null) {
+ HeaderElement[] encodings = contentEncodingHeader.getElements();
+ for (int i = 0; i < encodings.length; i++) {
+ if (encodings[i].getName().equalsIgnoreCase("gzip")) {
+ entity = new GzipDecompressingEntity(entity);
+ break;
+ }
+ }
+ }
+
+ data = EntityUtils.toByteArray(entity);
- return len;
+ EntityUtils.consume(entity);
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
+ response.close();
+ request.releaseConnection();
+ }
+
+ if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
+ try {
+ data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
+ } catch (Exception e) {
+ throw new IOException("fail to decrypt", e);
}
}
+
+ if (chunkView.isCompressed) {
+ data = Gzip.decompress(data);
+ }
+
+ LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length);
+
+ return data;
+
}
protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) {
@@ -90,27 +202,40 @@ public class SeaweedRead {
long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) {
- if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
+ long chunkStart = Math.max(offset, chunk.start);
+ long chunkStop = Math.min(stop, chunk.stop);
+ if (chunkStart < chunkStop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
- chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
- isFullChunk
+ chunk.fileId,
+ chunkStart - chunk.start + chunk.chunkOffset,
+ chunkStop - chunkStart,
+ chunkStart,
+ isFullChunk,
+ chunk.cipherKey,
+ chunk.isCompressed
));
- offset = Math.min(chunk.stop, stop);
}
}
return views;
}
- public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
+ public static List<VisibleInterval> nonOverlappingVisibleIntervals(
+ final FilerClient filerClient, List<FilerProto.FileChunk> chunkList) throws IOException {
+
+ chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList);
+
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- return (int) (a.getMtime() - b.getMtime());
+ // if just a.getMtime() - b.getMtime(), it will overflow!
+ if (a.getMtime() < b.getMtime()) {
+ return -1;
+ } else if (a.getMtime() > b.getMtime()) {
+ return 1;
+ }
+ return 0;
}
});
@@ -127,11 +252,14 @@ public class SeaweedRead {
List<VisibleInterval> newVisibles,
FilerProto.FileChunk chunk) {
VisibleInterval newV = new VisibleInterval(
- chunk.getOffset(),
- chunk.getOffset() + chunk.getSize(),
- chunk.getFileId(),
- chunk.getMtime(),
- true
+ chunk.getOffset(),
+ chunk.getOffset() + chunk.getSize(),
+ chunk.getFileId(),
+ chunk.getMtime(),
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed()
);
// easy cases to speed up
@@ -147,21 +275,27 @@ public class SeaweedRead {
for (VisibleInterval v : visibles) {
if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) {
newVisibles.add(new VisibleInterval(
- v.start,
- chunk.getOffset(),
- v.fileId,
- v.modifiedTime,
- false
+ v.start,
+ chunk.getOffset(),
+ v.fileId,
+ v.modifiedTime,
+ v.chunkOffset,
+ false,
+ v.cipherKey,
+ v.isCompressed
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
if (v.start < chunkStop && chunkStop < v.stop) {
newVisibles.add(new VisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false
+ chunkStop,
+ v.stop,
+ v.fileId,
+ v.modifiedTime,
+ v.chunkOffset + (chunkStop - v.start),
+ false,
+ v.cipherKey,
+ v.isCompressed
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -191,6 +325,10 @@ public class SeaweedRead {
return fileId;
}
+ public static long fileSize(FilerProto.Entry entry) {
+ return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ }
+
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
long size = 0;
for (FilerProto.FileChunk chunk : chunksList) {
@@ -207,25 +345,33 @@ public class SeaweedRead {
public final long stop;
public final long modifiedTime;
public final String fileId;
+ public final long chunkOffset;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
+ this.chunkOffset = chunkOffset;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isCompressed = isCompressed;
}
@Override
public String toString() {
return "VisibleInterval{" +
- "start=" + start +
- ", stop=" + stop +
- ", modifiedTime=" + modifiedTime +
- ", fileId='" + fileId + '\'' +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "start=" + start +
+ ", stop=" + stop +
+ ", modifiedTime=" + modifiedTime +
+ ", fileId='" + fileId + '\'' +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isCompressed=" + isCompressed +
+ '}';
}
}
@@ -235,24 +381,30 @@ public class SeaweedRead {
public final long size;
public final long logicOffset;
public final boolean isFullChunk;
+ public final byte[] cipherKey;
+ public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
+ this.cipherKey = cipherKey;
+ this.isCompressed = isCompressed;
}
@Override
public String toString() {
return "ChunkView{" +
- "fileId='" + fileId + '\'' +
- ", offset=" + offset +
- ", size=" + size +
- ", logicOffset=" + logicOffset +
- ", isFullChunk=" + isFullChunk +
- '}';
+ "fileId='" + fileId + '\'' +
+ ", offset=" + offset +
+ ", size=" + size +
+ ", logicOffset=" + logicOffset +
+ ", isFullChunk=" + isFullChunk +
+ ", cipherKey=" + Arrays.toString(cipherKey) +
+ ", isCompressed=" + isCompressed +
+ '}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
new file mode 100644
index 000000000..c465d935f
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -0,0 +1,30 @@
+package seaweedfs.client;
+
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+public class SeaweedUtil {
+
+ static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+ static CloseableHttpClient httpClient;
+
+ static {
+ // Increase max total connection to 200
+ cm.setMaxTotal(200);
+ // Increase default max connection per route to 20
+ cm.setDefaultMaxPerRoute(20);
+
+ httpClient = HttpClientBuilder.create()
+ .setConnectionManager(cm)
+ .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
+ .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
+ .build();
+ }
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return httpClient;
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 0663e8d98..f8c0c76b6 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,68 +1,114 @@
package seaweedfs.client;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
+import com.google.protobuf.ByteString;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.security.SecureRandom;
+import java.util.List;
public class SeaweedWrite {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedWrite.class);
+
+ private static final SecureRandom random = new SecureRandom();
+
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
- final FilerGrpcClient filerGrpcClient,
+ final FilerClient filerClient,
final long offset,
final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
- FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
+ final long bytesOffset, final long bytesLength,
+ final String path) throws IOException {
+ FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
+ replication, filerClient, offset, bytes, bytesOffset, bytesLength, path);
+ synchronized (entry) {
+ entry.addChunks(chunkBuilder);
+ }
+ }
+
+ public static FilerProto.FileChunk.Builder writeChunk(final String replication,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength,
+ final String path) throws IOException {
+ FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection("")
- .setReplication(replication)
+ .setCollection(filerClient.getCollection())
+ .setReplication(replication == null ? filerClient.getReplication() : replication)
.setDataCenter("")
- .setReplication("")
.setTtlSec(0)
+ .setPath(path)
.build());
String fileId = response.getFileId();
- String url = response.getUrl();
String auth = response.getAuth();
- String targetUrl = String.format("http://%s/%s", url, fileId);
- String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength);
+ String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl());
- entry.addChunks(FilerProto.FileChunk.newBuilder()
+ ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
+ byte[] cipherKey = null;
+ if (filerClient.isCipher()) {
+ cipherKey = genCipherKey();
+ cipherKeyString = ByteString.copyFrom(cipherKey);
+ }
+
+ String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
+
+ LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
+
+ return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
.setSize(bytesLength)
.setMtime(System.currentTimeMillis() / 10000L)
.setETag(etag)
- );
-
+ .setCipherKey(cipherKeyString);
}
- public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String parentDirectory, final FilerProto.Entry.Builder entry) {
- filerGrpcClient.getBlockingStub().createEntry(
- FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parentDirectory)
- .setEntry(entry)
- .build()
- );
+ public static void writeMeta(final FilerClient filerClient,
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
+
+ synchronized (entry) {
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
+ entry.clearChunks();
+ entry.addAllChunks(chunks);
+ filerClient.getBlockingStub().createEntry(
+ FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parentDirectory)
+ .setEntry(entry)
+ .build()
+ );
+ }
}
private static String multipartUpload(String targetUrl,
String auth,
final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
-
- HttpClient client = new DefaultHttpClient();
-
- InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ final long bytesOffset, final long bytesLength,
+ byte[] cipherKey) throws IOException {
+
+ InputStream inputStream = null;
+ if (cipherKey == null || cipherKey.length == 0) {
+ inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+ } else {
+ try {
+ byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey);
+ inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length);
+ } catch (Exception e) {
+ throw new IOException("fail to encrypt data", e);
+ }
+ }
HttpPost post = new HttpPost(targetUrl);
if (auth != null && auth.length() != 0) {
@@ -74,8 +120,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream)
.build());
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
+
try {
- HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
@@ -83,13 +130,19 @@ public class SeaweedWrite {
etag = etag.substring(1, etag.length() - 1);
}
+ EntityUtils.consume(response.getEntity());
+
return etag;
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ post.releaseConnection();
}
}
+
+ private static byte[] genCipherKey() {
+ byte[] b = new byte[32];
+ random.nextBytes(b);
+ return b;
+ }
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
new file mode 100644
index 000000000..fd2649cc2
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
@@ -0,0 +1,36 @@
+package seaweedfs.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.concurrent.TimeUnit;
+
+public class VolumeIdCache {
+
+ private Cache<String, FilerProto.Locations> cache = null;
+
+ public VolumeIdCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(maxEntries)
+ .expireAfterAccess(5, TimeUnit.MINUTES)
+ .build();
+ }
+
+ public FilerProto.Locations getLocations(String volumeId) {
+ if (this.cache == null) {
+ return null;
+ }
+ return this.cache.getIfPresent(volumeId);
+ }
+
+ public void setLocations(String volumeId, FilerProto.Locations locations) {
+ if (this.cache == null) {
+ return;
+ }
+ this.cache.put(volumeId, locations);
+ }
+
+}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index ef847cbe7..ac4c9a0e7 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -2,6 +2,7 @@ syntax = "proto3";
package filer_pb;
+option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb";
option java_package = "seaweedfs.client";
option java_outer_classname = "FilerProto";
@@ -21,6 +22,9 @@ service SeaweedFiler {
rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) {
}
+ rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) {
+ }
+
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
}
@@ -33,6 +37,9 @@ service SeaweedFiler {
rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
}
+ rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) {
+ }
+
rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
}
@@ -42,6 +49,24 @@ service SeaweedFiler {
rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
}
+ rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
+ rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
+ rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
+ }
+
+ rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
+ }
+
+ rpc KvGet (KvGetRequest) returns (KvGetResponse) {
+ }
+
+ rpc KvPut (KvPutRequest) returns (KvPutResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -73,6 +98,9 @@ message Entry {
repeated FileChunk chunks = 3;
FuseAttributes attributes = 4;
map<string, bytes> extended = 5;
+ bytes hard_link_id = 7;
+ int32 hard_link_counter = 8; // only exists in hard link meta data
+ bytes content = 9; // if not empty, the file content
}
message FullEntry {
@@ -85,6 +113,8 @@ message EventNotification {
Entry new_entry = 2;
bool delete_chunks = 3;
string new_parent_path = 4;
+ bool is_from_other_cluster = 5;
+ repeated int32 signatures = 6;
}
message FileChunk {
@@ -96,6 +126,13 @@ message FileChunk {
string source_file_id = 6; // to be deprecated
FileId fid = 7;
FileId source_fid = 8;
+ bytes cipher_key = 9;
+ bool is_compressed = 10;
+ bool is_chunk_manifest = 11; // content is a list of FileChunks
+}
+
+message FileChunkManifest {
+ repeated FileChunk chunks = 1;
}
message FileId {
@@ -118,23 +155,39 @@ message FuseAttributes {
string user_name = 11; // for hdfs
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
+ bytes md5 = 14;
+ string disk_type = 15;
}
message CreateEntryRequest {
string directory = 1;
Entry entry = 2;
+ bool o_excl = 3;
+ bool is_from_other_cluster = 4;
+ repeated int32 signatures = 5;
}
message CreateEntryResponse {
+ string error = 1;
}
message UpdateEntryRequest {
string directory = 1;
Entry entry = 2;
+ bool is_from_other_cluster = 3;
+ repeated int32 signatures = 4;
}
message UpdateEntryResponse {
}
+message AppendToEntryRequest {
+ string directory = 1;
+ string entry_name = 2;
+ repeated FileChunk chunks = 3;
+}
+message AppendToEntryResponse {
+}
+
message DeleteEntryRequest {
string directory = 1;
string name = 2;
@@ -142,9 +195,12 @@ message DeleteEntryRequest {
bool is_delete_data = 4;
bool is_recursive = 5;
bool ignore_recursive_error = 6;
+ bool is_from_other_cluster = 7;
+ repeated int32 signatures = 8;
}
message DeleteEntryResponse {
+ string error = 1;
}
message AtomicRenameEntryRequest {
@@ -163,6 +219,9 @@ message AssignVolumeRequest {
string replication = 3;
int32 ttl_sec = 4;
string data_center = 5;
+ string path = 6;
+ string rack = 7;
+ string disk_type = 8;
}
message AssignVolumeResponse {
@@ -171,6 +230,9 @@ message AssignVolumeResponse {
string public_url = 3;
int32 count = 4;
string auth = 5;
+ string collection = 6;
+ string replication = 7;
+ string error = 8;
}
message LookupVolumeRequest {
@@ -189,6 +251,16 @@ message LookupVolumeResponse {
map<string, Locations> locations_map = 1;
}
+message Collection {
+ string name = 1;
+}
+message CollectionListRequest {
+ bool include_normal_volumes = 1;
+ bool include_ec_volumes = 2;
+}
+message CollectionListResponse {
+ repeated Collection collections = 1;
+}
message DeleteCollectionRequest {
string collection = 1;
}
@@ -200,11 +272,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
+ string disk_type = 4;
}
message StatisticsResponse {
- string replication = 1;
- string collection = 2;
- string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@@ -217,4 +287,80 @@ message GetFilerConfigurationResponse {
string replication = 2;
string collection = 3;
uint32 max_mb = 4;
+ string dir_buckets = 5;
+ bool cipher = 7;
+ int32 signature = 8;
+ string metrics_address = 9;
+ int32 metrics_interval_sec = 10;
+}
+
+message SubscribeMetadataRequest {
+ string client_name = 1;
+ string path_prefix = 2;
+ int64 since_ns = 3;
+ int32 signature = 4;
+}
+message SubscribeMetadataResponse {
+ string directory = 1;
+ EventNotification event_notification = 2;
+ int64 ts_ns = 3;
+}
+
+message LogEntry {
+ int64 ts_ns = 1;
+ int32 partition_key_hash = 2;
+ bytes data = 3;
+}
+
+message KeepConnectedRequest {
+ string name = 1;
+ uint32 grpc_port = 2;
+ repeated string resources = 3;
+}
+message KeepConnectedResponse {
+}
+
+message LocateBrokerRequest {
+ string resource = 1;
+}
+message LocateBrokerResponse {
+ bool found = 1;
+ // if found, send the exact address
+ // if not found, send the full list of existing brokers
+ message Resource {
+ string grpc_addresses = 1;
+ int32 resource_count = 2;
+ }
+ repeated Resource resources = 2;
+}
+
+// Key-Value operations
+message KvGetRequest {
+ bytes key = 1;
+}
+message KvGetResponse {
+ bytes value = 1;
+ string error = 2;
+}
+message KvPutRequest {
+ bytes key = 1;
+ bytes value = 2;
+}
+message KvPutResponse {
+ string error = 1;
+}
+
+// path-based configurations
+message FilerConf {
+ int32 version = 1;
+ message PathConf {
+ string location_prefix = 1;
+ string collection = 2;
+ string replication = 3;
+ string ttl = 4;
+ string disk_type = 5;
+ bool fsync = 6;
+ uint32 volume_growth_count = 7;
+ }
+ repeated PathConf locations = 2;
}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
new file mode 100644
index 000000000..7b5e53e19
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java
@@ -0,0 +1,42 @@
+package seaweedfs.client;
+
+import org.junit.Test;
+
+import java.util.Base64;
+
+import static seaweedfs.client.SeaweedCipher.decrypt;
+import static seaweedfs.client.SeaweedCipher.encrypt;
+
+public class SeaweedCipherTest {
+
+ @Test
+ public void testSameAsGoImplemnetation() throws Exception {
+ byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+ @Test
+ public void testEncryptDecrypt() throws Exception {
+ byte[] secretKey = SeaweedCipher.genCipherKey();
+
+ String plainText = "Now we need to generate a 256-bit key for AES 256 GCM";
+
+ System.out.println("Original Text : " + plainText);
+
+ byte[] cipherText = encrypt(plainText.getBytes(), secretKey);
+ System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText));
+
+ byte[] decryptedText = decrypt(cipherText, secretKey);
+ System.out.println("DeCrypted Text : " + new String(decryptedText));
+ }
+
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
index ccfcdb117..44b833c90 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -3,13 +3,14 @@ package seaweedfs.client;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SeaweedReadTest {
@Test
- public void testNonOverlappingVisibleIntervals() {
+ public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
@@ -24,7 +25,7 @@ public class SeaweedReadTest {
.setMtime(2000)
.build());
- List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}