aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
committershibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
commit7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch)
tree5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /other/java/client
parent29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff)
parent53c3aad87528d57343afc5fdb3fb5107544af0fc (diff)
downloadseaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz
seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/pom.xml4
-rw-r--r--other/java/client/pom.xml.deploy170
-rw-r--r--other/java/client/pom_debug.xml4
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java12
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java140
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java103
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java7
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/Gzip.java14
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java183
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java30
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java74
-rw-r--r--other/java/client/src/main/proto/filer.proto63
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java5
14 files changed, 733 insertions, 117 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index a8b561251..efbf304c4 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.8</version>
+ <version>1.5.2</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -65,7 +65,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.12</version>
+ <version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
new file mode 100644
index 000000000..9efc21373
--- /dev/null
+++ b/other/java/client/pom.xml.deploy
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.5.2</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 88447f7e7..04ff52730 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.8</version>
+ <version>1.5.2</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -65,7 +65,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.12</version>
+ <version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
new file mode 100644
index 000000000..994bcaa2b
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -0,0 +1,41 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ByteBufferPool {
+
+ private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024;
+ private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
+
+ private static final List<ByteBuffer> bufferList = new ArrayList<>();
+
+ public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ bufferSize = MIN_BUFFER_SIZE;
+ }
+ LOG.debug("requested new buffer {}", bufferSize);
+ if (bufferList.isEmpty()) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+ ByteBuffer buffer = bufferList.remove(bufferList.size() - 1);
+ if (buffer.capacity() >= bufferSize) {
+ return buffer;
+ }
+
+ LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize);
+ bufferList.add(0, buffer);
+ return ByteBuffer.allocate(bufferSize);
+
+ }
+
+ public static synchronized void release(ByteBuffer obj) {
+ obj.clear();
+ bufferList.add(0, obj);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
index e249d4524..7afa2dca0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -7,20 +7,30 @@ import java.util.concurrent.TimeUnit;
public class ChunkCache {
- private final Cache<String, byte[]> cache;
+ private Cache<String, byte[]> cache = null;
public ChunkCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
+ .weakValues()
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
}
public byte[] getChunk(String fileId) {
+ if (this.cache == null) {
+ return null;
+ }
return this.cache.getIfPresent(fileId);
}
public void setChunk(String fileId, byte[] data) {
+ if (this.cache == null) {
+ return;
+ }
this.cache.put(fileId, data);
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
new file mode 100644
index 000000000..3293db2ca
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -0,0 +1,140 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileChunkManifest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class);
+
+ private static final int mergeFactor = 1000;
+
+ public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) {
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (chunk.getIsChunkManifest()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<FilerProto.FileChunk> resolveChunkManifest(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException {
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ continue;
+ }
+
+ // IsChunkManifest
+ LOG.debug("fetching chunk manifest:{}", chunk);
+ byte[] data = fetchChunk(filerGrpcClient, chunk);
+ FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
+ List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk t : m.getChunksList()) {
+ // avoid deprecated chunk.getFileId()
+ resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
+ }
+ dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks));
+ }
+
+ return dataChunks;
+ }
+
+ private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
+
+ String vid = "" + chunk.getFid().getVolumeId();
+ FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid);
+ if (locations == null) {
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ lookupRequest.addVolumeIds(vid);
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ locations = lookupResponse.getLocationsMapMap().get(vid);
+ filerGrpcClient.vidLocations.put(vid, locations);
+ LOG.debug("fetchChunk vid:{} locations:{}", vid, locations);
+ }
+
+ SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
+ FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
+ 0,
+ -1,
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed());
+
+ byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
+ if (chunkData == null) {
+ LOG.debug("doFetchFullChunkData:{}", chunkView);
+ chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
+ }
+ if (chunk.getIsChunkManifest()){
+ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
+ SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+ }
+
+ return chunkData;
+
+ }
+
+ public static List<FilerProto.FileChunk> maybeManifestize(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException {
+ // the return variable
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : inputChunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ } else {
+ chunks.add(chunk);
+ }
+ }
+
+ int remaining = dataChunks.size();
+ for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
+ FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor), parentDirectory);
+ chunks.add(chunk);
+ remaining -= mergeFactor;
+ }
+
+ // remaining
+ for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) {
+ chunks.add(dataChunks.get(i));
+ }
+ return chunks;
+ }
+
+ private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException {
+ // create and serialize the manifest
+ dataChunks = FilerClient.beforeEntrySerialization(dataChunks);
+ FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
+ byte[] data = m.build().toByteArray();
+
+ long minOffset = Long.MAX_VALUE;
+ long maxOffset = -1;
+ for (FilerProto.FileChunk chunk : dataChunks) {
+ minOffset = Math.min(minOffset, chunk.getOffset());
+ maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset());
+ }
+
+ FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
+ filerGrpcClient.getReplication(),
+ filerGrpcClient,
+ minOffset,
+ data, 0, data.length, parentDirectory);
+ manifestChunk.setIsChunkManifest(true);
+ manifestChunk.setSize(maxOffset - minOffset);
+ return manifestChunk.build();
+
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index ef32c7e9a..035b2c852 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -1,5 +1,6 @@
package seaweedfs.client;
+import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,6 +25,67 @@ public class FilerClient {
this.filerGrpcClient = filerGrpcClient;
}
+ public static String toFileId(FilerProto.FileId fid) {
+ if (fid == null) {
+ return null;
+ }
+ return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ }
+
+ public static FilerProto.FileId toFileIdObject(String fileIdStr) {
+ if (fileIdStr == null || fileIdStr.length() == 0) {
+ return null;
+ }
+ int commaIndex = fileIdStr.lastIndexOf(',');
+ String volumeIdStr = fileIdStr.substring(0, commaIndex);
+ String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8);
+ String cookieStr = fileIdStr.substring(fileIdStr.length() - 8);
+
+ return FilerProto.FileId.newBuilder()
+ .setVolumeId(Integer.parseInt(volumeIdStr))
+ .setFileKey(Long.parseLong(fileKeyStr, 16))
+ .setCookie((int) Long.parseLong(cookieStr, 16))
+ .build();
+ }
+
+ public static List<FilerProto.FileChunk> beforeEntrySerialization(List<FilerProto.FileChunk> chunks) {
+ List<FilerProto.FileChunk> cleanedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : chunks) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.clearFileId();
+ chunkBuilder.clearSourceFileId();
+ chunkBuilder.setFid(toFileIdObject(chunk.getFileId()));
+ FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId());
+ if (sourceFid != null) {
+ chunkBuilder.setSourceFid(sourceFid);
+ }
+ cleanedChunks.add(chunkBuilder.build());
+ }
+ return cleanedChunks;
+ }
+
+ public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) {
+ if (entry.getChunksList().size() <= 0) {
+ return entry;
+ }
+ String fileId = entry.getChunks(0).getFileId();
+ if (fileId != null && fileId.length() != 0) {
+ return entry;
+ }
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ entryBuilder.clearChunks();
+ for (FilerProto.FileChunk chunk : entry.getChunksList()) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.setFileId(toFileId(chunk.getFid()));
+ String sourceFileId = toFileId(chunk.getSourceFid());
+ if (sourceFileId != null) {
+ chunkBuilder.setSourceFileId(sourceFileId);
+ }
+ entryBuilder.addChunks(chunkBuilder);
+ }
+ return entryBuilder.build();
+ }
+
public boolean mkdirs(String path, int mode) {
String currentUser = System.getProperty("user.name");
return mkdirs(path, mode, 0, 0, currentUser, new String[]{});
@@ -156,7 +218,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) {
- List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024);
+ List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) {
break;
}
@@ -173,17 +235,18 @@ public class FilerClient {
return results;
}
- public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
+ public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
+ .setInclusiveStartFrom(includeLastEntry)
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
while (iter.hasNext()) {
FilerProto.ListEntriesResponse resp = iter.next();
- entries.add(fixEntryAfterReading(resp.getEntry()));
+ entries.add(afterEntryDeserialization(resp.getEntry()));
}
return entries;
}
@@ -198,7 +261,7 @@ public class FilerClient {
if (entry == null) {
return null;
}
- return fixEntryAfterReading(entry);
+ return afterEntryDeserialization(entry);
} catch (Exception e) {
if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
return null;
@@ -208,18 +271,22 @@ public class FilerClient {
}
}
-
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
- filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
+ FilerProto.CreateEntryResponse createEntryResponse =
+ filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parent)
.setEntry(entry)
.build());
+ if (Strings.isNullOrEmpty(createEntryResponse.getError())) {
+ return true;
+ }
+ LOG.warn("createEntry {}/{} error: {}", parent, entry.getName(), createEntryResponse.getError());
+ return false;
} catch (Exception e) {
LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
return false;
}
- return true;
}
public boolean updateEntry(String parent, FilerProto.Entry entry) {
@@ -229,7 +296,7 @@ public class FilerClient {
.setEntry(entry)
.build());
} catch (Exception e) {
- LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e);
+ LOG.warn("updateEntry {}/{}: {}", parent, entry.getName(), e);
return false;
}
return true;
@@ -266,24 +333,4 @@ public class FilerClient {
return true;
}
- private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) {
- if (entry.getChunksList().size() <= 0) {
- return entry;
- }
- String fileId = entry.getChunks(0).getFileId();
- if (fileId != null && fileId.length() != 0) {
- return entry;
- }
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- entryBuilder.clearChunks();
- for (FilerProto.FileChunk chunk : entry.getChunksList()) {
- FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
- FilerProto.FileId fid = chunk.getFid();
- fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
- chunkBuilder.setFileId(fileId);
- entryBuilder.addChunks(chunkBuilder);
- }
- return entryBuilder.build();
- }
-
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 3f5d1e8e9..1a719f3c0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -9,6 +9,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
@@ -24,6 +26,7 @@ public class FilerGrpcClient {
}
}
+ public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
@@ -39,8 +42,10 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
+ ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()
+ .maxInboundMessageSize(1024 * 1024 * 1024) :
NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
index 248285dd3..4909094f5 100644
--- a/other/java/client/src/main/java/seaweedfs/client/Gzip.java
+++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
@@ -18,14 +18,18 @@ public class Gzip {
return compressed;
}
- public static byte[] decompress(byte[] compressed) throws IOException {
- ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
- GZIPInputStream gis = new GZIPInputStream(bis);
- return readAll(gis);
+ public static byte[] decompress(byte[] compressed) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ return readAll(gis);
+ } catch (Exception e) {
+ return compressed;
+ }
}
private static byte[] readAll(InputStream input) throws IOException {
- try( ByteArrayOutputStream output = new ByteArrayOutputStream()){
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
byte[] buffer = new byte[4096];
int n;
while (-1 != (n = input.read(buffer))) {
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 7be39da53..ab2407dec 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -1,16 +1,16 @@
package seaweedfs.client;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.GzipDecompressingEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.util.*;
@@ -18,12 +18,12 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
- static ChunkCache chunkCache = new ChunkCache(1000);
+ static ChunkCache chunkCache = new ChunkCache(4);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength) throws IOException {
+ final int bufferLength, final long fileSize) throws IOException {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
@@ -40,67 +40,128 @@ public class SeaweedRead {
//TODO parallel this
long readCount = 0;
- int startOffset = bufferOffset;
+ long startOffset = position;
for (ChunkView chunkView : chunkViews) {
+
+ if (startOffset < chunkView.logicOffset) {
+ long gap = chunkView.logicOffset - startOffset;
+ LOG.debug("zero [{},{})", startOffset, startOffset + gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
- if (locations.getLocationsCount() == 0) {
+ if (locations == null || locations.getLocationsCount() == 0) {
+ LOG.error("failed to locate {}", chunkView.fileId);
// log here!
return 0;
}
- int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+ int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations);
+
+ LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
readCount += len;
startOffset += len;
}
+ long limit = Math.min(bufferOffset + bufferLength, fileSize);
+
+ if (startOffset < limit) {
+ long gap = limit - startOffset;
+ LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
return readCount;
}
- private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) {
chunkData = doFetchFullChunkData(chunkView, locations);
+ chunkCache.setChunk(chunkView.fileId, chunkData);
}
int len = (int) chunkView.size;
- LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
- chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
- System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
-
- chunkCache.setChunk(chunkView.fileId, chunkData);
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset);
+ System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len);
return len;
}
- private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+
+ byte[] data = null;
+ IOException lastException = null;
+ for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
+ for (FilerProto.Location location : locations.getLocationsList()) {
+ String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
+ try {
+ data = doFetchOneFullChunkData(chunkView, url);
+ lastException = null;
+ break;
+ } catch (IOException ioe) {
+ LOG.debug("doFetchFullChunkData {} :{}", url, ioe);
+ lastException = ioe;
+ }
+ }
+ if (data != null) {
+ break;
+ }
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (lastException != null) {
+ throw lastException;
+ }
+
+ LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
+
+ return data;
+
+ }
+
+ public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
- HttpClient client = new DefaultHttpClient();
- HttpGet request = new HttpGet(
- String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+ HttpGet request = new HttpGet(url);
- request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
+ request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
byte[] data = null;
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
+
try {
- HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
- data = EntityUtils.toByteArray(entity);
+ Header contentEncodingHeader = entity.getContentEncoding();
- } finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
+ if (contentEncodingHeader != null) {
+ HeaderElement[] encodings = contentEncodingHeader.getElements();
+ for (int i = 0; i < encodings.length; i++) {
+ if (encodings[i].getName().equalsIgnoreCase("gzip")) {
+ entity = new GzipDecompressingEntity(entity);
+ break;
+ }
+ }
}
- }
- if (chunkView.isGzipped) {
- data = Gzip.decompress(data);
+ data = EntityUtils.toByteArray(entity);
+
+ EntityUtils.consume(entity);
+
+ } finally {
+ response.close();
+ request.releaseConnection();
}
if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
@@ -111,6 +172,12 @@ public class SeaweedRead {
}
}
+ if (chunkView.isCompressed) {
+ data = Gzip.decompress(data);
+ }
+
+ LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length);
+
return data;
}
@@ -120,29 +187,40 @@ public class SeaweedRead {
long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) {
- if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
+ long chunkStart = Math.max(offset, chunk.start);
+ long chunkStop = Math.min(stop, chunk.stop);
+ if (chunkStart < chunkStop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
+ chunkStart - chunk.start + chunk.chunkOffset,
+ chunkStop - chunkStart,
+ chunkStart,
isFullChunk,
chunk.cipherKey,
- chunk.isGzipped
+ chunk.isCompressed
));
- offset = Math.min(chunk.stop, stop);
}
}
return views;
}
- public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
+ public static List<VisibleInterval> nonOverlappingVisibleIntervals(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException {
+
+ chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList);
+
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- return (int) (a.getMtime() - b.getMtime());
+ // if just a.getMtime() - b.getMtime(), it will overflow!
+ if (a.getMtime() < b.getMtime()) {
+ return -1;
+ } else if (a.getMtime() > b.getMtime()) {
+ return 1;
+ }
+ return 0;
}
});
@@ -163,9 +241,10 @@ public class SeaweedRead {
chunk.getOffset() + chunk.getSize(),
chunk.getFileId(),
chunk.getMtime(),
+ 0,
true,
chunk.getCipherKey().toByteArray(),
- chunk.getIsGzipped()
+ chunk.getIsCompressed()
);
// easy cases to speed up
@@ -185,9 +264,10 @@ public class SeaweedRead {
chunk.getOffset(),
v.fileId,
v.modifiedTime,
+ v.chunkOffset,
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
@@ -197,9 +277,10 @@ public class SeaweedRead {
v.stop,
v.fileId,
v.modifiedTime,
+ v.chunkOffset + (chunkStop - v.start),
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -229,6 +310,10 @@ public class SeaweedRead {
return fileId;
}
+ public static long fileSize(FilerProto.Entry entry) {
+ return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ }
+
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
long size = 0;
for (FilerProto.FileChunk chunk : chunksList) {
@@ -245,18 +330,20 @@ public class SeaweedRead {
public final long stop;
public final long modifiedTime;
public final String fileId;
+ public final long chunkOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
+ this.chunkOffset = chunkOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -268,7 +355,7 @@ public class SeaweedRead {
", fileId='" + fileId + '\'' +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
@@ -280,16 +367,16 @@ public class SeaweedRead {
public final long logicOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -301,7 +388,7 @@ public class SeaweedRead {
", logicOffset=" + logicOffset +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
new file mode 100644
index 000000000..c465d935f
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -0,0 +1,30 @@
+package seaweedfs.client;
+
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+public class SeaweedUtil {
+
+ static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+ static CloseableHttpClient httpClient;
+
+ static {
+ // Increase max total connection to 200
+ cm.setMaxTotal(200);
+ // Increase default max connection per route to 20
+ cm.setDefaultMaxPerRoute(20);
+
+ httpClient = HttpClientBuilder.create()
+ .setConnectionManager(cm)
+ .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
+ .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
+ .build();
+ }
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return httpClient;
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index dc6203e52..b8fd3e299 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,35 +1,54 @@
package seaweedfs.client;
import com.google.protobuf.ByteString;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
+import java.util.List;
public class SeaweedWrite {
- private static SecureRandom random = new SecureRandom();
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedWrite.class);
+
+ private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
final FilerGrpcClient filerGrpcClient,
final long offset,
final byte[] bytes,
- final long bytesOffset, final long bytesLength) throws IOException {
+ final long bytesOffset, final long bytesLength,
+ final String path) throws IOException {
+ FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
+ replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength, path);
+ synchronized (entry) {
+ entry.addChunks(chunkBuilder);
+ }
+ }
+
+ public static FilerProto.FileChunk.Builder writeChunk(final String replication,
+ final FilerGrpcClient filerGrpcClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength,
+ final String path) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerGrpcClient.getCollection())
.setReplication(replication == null ? filerGrpcClient.getReplication() : replication)
.setDataCenter("")
.setTtlSec(0)
+ .setPath(path)
.build());
String fileId = response.getFileId();
String url = response.getUrl();
@@ -45,28 +64,32 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- // cache fileId ~ bytes
- SeaweedRead.chunkCache.setChunk(fileId, bytes);
+ LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
- entry.addChunks(FilerProto.FileChunk.newBuilder()
+ return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
.setSize(bytesLength)
.setMtime(System.currentTimeMillis() / 10000L)
.setETag(etag)
- .setCipherKey(cipherKeyString)
- );
-
+ .setCipherKey(cipherKeyString);
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String parentDirectory, final FilerProto.Entry.Builder entry) {
- filerGrpcClient.getBlockingStub().createEntry(
- FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parentDirectory)
- .setEntry(entry)
- .build()
- );
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
+
+ synchronized (entry) {
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList(), parentDirectory);
+ entry.clearChunks();
+ entry.addAllChunks(chunks);
+ filerGrpcClient.getBlockingStub().createEntry(
+ FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(parentDirectory)
+ .setEntry(entry)
+ .build()
+ );
+ }
}
private static String multipartUpload(String targetUrl,
@@ -75,8 +98,6 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
- HttpClient client = new DefaultHttpClient();
-
InputStream inputStream = null;
if (cipherKey == null || cipherKey.length == 0) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
@@ -99,8 +120,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream)
.build());
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
+
try {
- HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
@@ -108,12 +130,12 @@ public class SeaweedWrite {
etag = etag.substring(1, etag.length() - 1);
}
+ EntityUtils.consume(response.getEntity());
+
return etag;
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ post.releaseConnection();
}
}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 1fc8ef63d..11c29e6ec 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -2,6 +2,7 @@ syntax = "proto3";
package filer_pb;
+option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb";
option java_package = "seaweedfs.client";
option java_outer_classname = "FilerProto";
@@ -36,6 +37,9 @@ service SeaweedFiler {
rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
}
+ rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) {
+ }
+
rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
}
@@ -48,12 +52,21 @@ service SeaweedFiler {
rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
}
+ rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
+ }
+
rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
}
rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
}
+ rpc KvGet (KvGetRequest) returns (KvGetResponse) {
+ }
+
+ rpc KvPut (KvPutRequest) returns (KvPutResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -85,6 +98,8 @@ message Entry {
repeated FileChunk chunks = 3;
FuseAttributes attributes = 4;
map<string, bytes> extended = 5;
+ bytes hard_link_id = 7;
+ int32 hard_link_counter = 8; // only exists in hard link meta data
}
message FullEntry {
@@ -97,6 +112,8 @@ message EventNotification {
Entry new_entry = 2;
bool delete_chunks = 3;
string new_parent_path = 4;
+ bool is_from_other_cluster = 5;
+ repeated int32 signatures = 6;
}
message FileChunk {
@@ -109,7 +126,12 @@ message FileChunk {
FileId fid = 7;
FileId source_fid = 8;
bytes cipher_key = 9;
- bool is_gzipped = 10;
+ bool is_compressed = 10;
+ bool is_chunk_manifest = 11; // content is a list of FileChunks
+}
+
+message FileChunkManifest {
+ repeated FileChunk chunks = 1;
}
message FileId {
@@ -139,6 +161,8 @@ message CreateEntryRequest {
string directory = 1;
Entry entry = 2;
bool o_excl = 3;
+ bool is_from_other_cluster = 4;
+ repeated int32 signatures = 5;
}
message CreateEntryResponse {
@@ -148,6 +172,8 @@ message CreateEntryResponse {
message UpdateEntryRequest {
string directory = 1;
Entry entry = 2;
+ bool is_from_other_cluster = 3;
+ repeated int32 signatures = 4;
}
message UpdateEntryResponse {
}
@@ -167,6 +193,8 @@ message DeleteEntryRequest {
bool is_delete_data = 4;
bool is_recursive = 5;
bool ignore_recursive_error = 6;
+ bool is_from_other_cluster = 7;
+ repeated int32 signatures = 8;
}
message DeleteEntryResponse {
@@ -189,7 +217,8 @@ message AssignVolumeRequest {
string replication = 3;
int32 ttl_sec = 4;
string data_center = 5;
- string parent_path = 6;
+ string path = 6;
+ string rack = 7;
}
message AssignVolumeResponse {
@@ -219,6 +248,16 @@ message LookupVolumeResponse {
map<string, Locations> locations_map = 1;
}
+message Collection {
+ string name = 1;
+}
+message CollectionListRequest {
+ bool include_normal_volumes = 1;
+ bool include_ec_volumes = 2;
+}
+message CollectionListResponse {
+ repeated Collection collections = 1;
+}
message DeleteCollectionRequest {
string collection = 1;
}
@@ -249,12 +288,16 @@ message GetFilerConfigurationResponse {
uint32 max_mb = 4;
string dir_buckets = 5;
bool cipher = 7;
+ int32 signature = 8;
+ string metrics_address = 9;
+ int32 metrics_interval_sec = 10;
}
message SubscribeMetadataRequest {
string client_name = 1;
string path_prefix = 2;
int64 since_ns = 3;
+ int32 signature = 4;
}
message SubscribeMetadataResponse {
string directory = 1;
@@ -289,3 +332,19 @@ message LocateBrokerResponse {
}
repeated Resource resources = 2;
}
+
+// Key-Value operations
+message KvGetRequest {
+ bytes key = 1;
+}
+message KvGetResponse {
+ bytes value = 1;
+ string error = 2;
+}
+message KvPutRequest {
+ bytes key = 1;
+ bytes value = 2;
+}
+message KvPutResponse {
+ string error = 1;
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
index ccfcdb117..44b833c90 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -3,13 +3,14 @@ package seaweedfs.client;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SeaweedReadTest {
@Test
- public void testNonOverlappingVisibleIntervals() {
+ public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
@@ -24,7 +25,7 @@ public class SeaweedReadTest {
.setMtime(2000)
.build());
- List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}