aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/pom.xml2
-rw-r--r--other/java/client/pom.xml.deploy170
-rw-r--r--other/java/client/pom_debug.xml2
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java22
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java11
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java5
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java4
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java27
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java20
-rw-r--r--other/java/client/src/main/proto/filer.proto5
11 files changed, 273 insertions, 36 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 05061e0f6..62134715f 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.9</version>
+ <version>1.3.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
new file mode 100644
index 000000000..62134715f
--- /dev/null
+++ b/other/java/client/pom.xml.deploy
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.3.6</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 1d8454bf7..dcedc2aa6 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.9</version>
+ <version>1.3.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
new file mode 100644
index 000000000..897fe9694
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -0,0 +1,22 @@
+package seaweedfs.client;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ByteBufferPool {
+
+ static List<ByteBuffer> bufferList = new ArrayList<>();
+
+ public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferList.isEmpty()) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+ return bufferList.remove(bufferList.size()-1);
+ }
+
+ public static synchronized void release(ByteBuffer obj) {
+ bufferList.add(obj);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
index e249d4524..58870d742 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -7,9 +7,12 @@ import java.util.concurrent.TimeUnit;
public class ChunkCache {
- private final Cache<String, byte[]> cache;
+ private Cache<String, byte[]> cache = null;
public ChunkCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
.expireAfterAccess(1, TimeUnit.HOURS)
@@ -17,10 +20,16 @@ public class ChunkCache {
}
public byte[] getChunk(String fileId) {
+ if (this.cache == null) {
+ return null;
+ }
return this.cache.getIfPresent(fileId);
}
public void setChunk(String fileId, byte[] data) {
+ if (this.cache == null) {
+ return;
+ }
this.cache.put(fileId, data);
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index ef32c7e9a..2103fc699 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -156,7 +156,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) {
- List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024);
+ List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) {
break;
}
@@ -173,11 +173,12 @@ public class FilerClient {
return results;
}
- public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
+ public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
+ .setInclusiveStartFrom(includeLastEntry)
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 3f5d1e8e9..57b67f6b0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -39,8 +39,10 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
+ ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()
+ .maxInboundMessageSize(1024 * 1024 * 1024) :
NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 7be39da53..301919919 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -4,6 +4,7 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
@@ -18,7 +19,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
- static ChunkCache chunkCache = new ChunkCache(1000);
+ static ChunkCache chunkCache = new ChunkCache(16);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@@ -78,7 +79,6 @@ public class SeaweedRead {
private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
- HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@@ -86,20 +86,21 @@ public class SeaweedRead {
byte[] data = null;
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
+
try {
- HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
data = EntityUtils.toByteArray(entity);
+ EntityUtils.consume(entity);
+
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ request.releaseConnection();
}
- if (chunkView.isGzipped) {
+ if (chunkView.isCompressed) {
data = Gzip.decompress(data);
}
@@ -129,7 +130,7 @@ public class SeaweedRead {
offset,
isFullChunk,
chunk.cipherKey,
- chunk.isGzipped
+ chunk.isCompressed
));
offset = Math.min(chunk.stop, stop);
}
@@ -165,7 +166,7 @@ public class SeaweedRead {
chunk.getMtime(),
true,
chunk.getCipherKey().toByteArray(),
- chunk.getIsGzipped()
+ chunk.getIsCompressed()
);
// easy cases to speed up
@@ -187,7 +188,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
@@ -199,7 +200,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -247,16 +248,16 @@ public class SeaweedRead {
public final String fileId;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -268,7 +269,7 @@ public class SeaweedRead {
", fileId='" + fileId + '\'' +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
@@ -280,16 +281,16 @@ public class SeaweedRead {
public final long logicOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -301,7 +302,7 @@ public class SeaweedRead {
", logicOffset=" + logicOffset +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
new file mode 100644
index 000000000..e2835b718
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -0,0 +1,27 @@
+package seaweedfs.client;
+
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+public class SeaweedUtil {
+
+ static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+
+ static {
+ // Increase max total connection to 200
+ cm.setMaxTotal(200);
+ // Increase default max connection per route to 20
+ cm.setDefaultMaxPerRoute(20);
+ }
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return HttpClientBuilder.create()
+ .setConnectionManager(cm)
+ .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
+ .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
+ .build();
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 18ec77b76..e9819668c 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -3,10 +3,11 @@ package seaweedfs.client;
import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
@@ -16,7 +17,7 @@ import java.security.SecureRandom;
public class SeaweedWrite {
- private static SecureRandom random = new SecureRandom();
+ private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
@@ -63,7 +64,7 @@ public class SeaweedWrite {
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory, final FilerProto.Entry.Builder entry) {
- synchronized (entry){
+ synchronized (entry) {
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
@@ -79,8 +80,6 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
- HttpClient client = new DefaultHttpClient();
-
InputStream inputStream = null;
if (cipherKey == null || cipherKey.length == 0) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
@@ -103,8 +102,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream)
.build());
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
+
try {
- HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
@@ -112,12 +112,12 @@ public class SeaweedWrite {
etag = etag.substring(1, etag.length() - 1);
}
+ EntityUtils.consume(response.getEntity());
+
return etag;
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ post.releaseConnection();
}
}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 37121f29c..dcc18f2a5 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -115,6 +115,11 @@ message FileChunk {
FileId source_fid = 8;
bytes cipher_key = 9;
bool is_compressed = 10;
+ bool is_chunk_manifest = 11; // content is a list of FileChunks
+}
+
+message FileChunkManifest {
+ repeated FileChunk chunks = 1;
}
message FileId {