aboutsummaryrefslogtreecommitdiff
path: root/other/java/client/src
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-17 19:49:16 +0800
committerGitHub <noreply@github.com>2020-07-17 19:49:16 +0800
commit64df5207db14ccf7e7915561b5c9b8f3dab53c6e (patch)
treedf4a05999d67abb7c4765d39eddc01318521169b /other/java/client/src
parent8c318470dd95b3fc24d39dc3cf253cc17b03ab39 (diff)
parentf43146b237bc5bbfb7033f6e427b5299554c0824 (diff)
downloadseaweedfs-64df5207db14ccf7e7915561b5c9b8f3dab53c6e.tar.xz
seaweedfs-64df5207db14ccf7e7915561b5c9b8f3dab53c6e.zip
Merge pull request #2 from chrislusf/master
merge
Diffstat (limited to 'other/java/client/src')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java22
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java11
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java5
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java4
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java27
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java20
-rw-r--r--other/java/client/src/main/proto/filer.proto5
8 files changed, 101 insertions, 34 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
new file mode 100644
index 000000000..897fe9694
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -0,0 +1,22 @@
+package seaweedfs.client;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ByteBufferPool {
+
+ static List<ByteBuffer> bufferList = new ArrayList<>();
+
+ public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferList.isEmpty()) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+ return bufferList.remove(bufferList.size()-1);
+ }
+
+ public static synchronized void release(ByteBuffer obj) {
+ bufferList.add(obj);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
index e249d4524..58870d742 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -7,9 +7,12 @@ import java.util.concurrent.TimeUnit;
public class ChunkCache {
- private final Cache<String, byte[]> cache;
+ private Cache<String, byte[]> cache = null;
public ChunkCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
.expireAfterAccess(1, TimeUnit.HOURS)
@@ -17,10 +20,16 @@ public class ChunkCache {
}
public byte[] getChunk(String fileId) {
+ if (this.cache == null) {
+ return null;
+ }
return this.cache.getIfPresent(fileId);
}
public void setChunk(String fileId, byte[] data) {
+ if (this.cache == null) {
+ return;
+ }
this.cache.put(fileId, data);
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index ef32c7e9a..2103fc699 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -156,7 +156,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) {
- List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024);
+ List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) {
break;
}
@@ -173,11 +173,12 @@ public class FilerClient {
return results;
}
- public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
+ public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
+ .setInclusiveStartFrom(includeLastEntry)
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 3f5d1e8e9..57b67f6b0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -39,8 +39,10 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
+ ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()
+ .maxInboundMessageSize(1024 * 1024 * 1024) :
NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 7be39da53..301919919 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -4,6 +4,7 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
@@ -18,7 +19,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
- static ChunkCache chunkCache = new ChunkCache(1000);
+ static ChunkCache chunkCache = new ChunkCache(16);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@@ -78,7 +79,6 @@ public class SeaweedRead {
private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
- HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@@ -86,20 +86,21 @@ public class SeaweedRead {
byte[] data = null;
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
+
try {
- HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
data = EntityUtils.toByteArray(entity);
+ EntityUtils.consume(entity);
+
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ request.releaseConnection();
}
- if (chunkView.isGzipped) {
+ if (chunkView.isCompressed) {
data = Gzip.decompress(data);
}
@@ -129,7 +130,7 @@ public class SeaweedRead {
offset,
isFullChunk,
chunk.cipherKey,
- chunk.isGzipped
+ chunk.isCompressed
));
offset = Math.min(chunk.stop, stop);
}
@@ -165,7 +166,7 @@ public class SeaweedRead {
chunk.getMtime(),
true,
chunk.getCipherKey().toByteArray(),
- chunk.getIsGzipped()
+ chunk.getIsCompressed()
);
// easy cases to speed up
@@ -187,7 +188,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
@@ -199,7 +200,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -247,16 +248,16 @@ public class SeaweedRead {
public final String fileId;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -268,7 +269,7 @@ public class SeaweedRead {
", fileId='" + fileId + '\'' +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
@@ -280,16 +281,16 @@ public class SeaweedRead {
public final long logicOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -301,7 +302,7 @@ public class SeaweedRead {
", logicOffset=" + logicOffset +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
new file mode 100644
index 000000000..e2835b718
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -0,0 +1,27 @@
+package seaweedfs.client;
+
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+public class SeaweedUtil {
+
+ static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+
+ static {
+ // Increase max total connection to 200
+ cm.setMaxTotal(200);
+ // Increase default max connection per route to 20
+ cm.setDefaultMaxPerRoute(20);
+ }
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return HttpClientBuilder.create()
+ .setConnectionManager(cm)
+ .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
+ .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
+ .build();
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 18ec77b76..e9819668c 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -3,10 +3,11 @@ package seaweedfs.client;
import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
@@ -16,7 +17,7 @@ import java.security.SecureRandom;
public class SeaweedWrite {
- private static SecureRandom random = new SecureRandom();
+ private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
@@ -63,7 +64,7 @@ public class SeaweedWrite {
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory, final FilerProto.Entry.Builder entry) {
- synchronized (entry){
+ synchronized (entry) {
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
@@ -79,8 +80,6 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
- HttpClient client = new DefaultHttpClient();
-
InputStream inputStream = null;
if (cipherKey == null || cipherKey.length == 0) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
@@ -103,8 +102,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream)
.build());
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
+
try {
- HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
@@ -112,12 +112,12 @@ public class SeaweedWrite {
etag = etag.substring(1, etag.length() - 1);
}
+ EntityUtils.consume(response.getEntity());
+
return etag;
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ post.releaseConnection();
}
}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 37121f29c..dcc18f2a5 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -115,6 +115,11 @@ message FileChunk {
FileId source_fid = 8;
bytes cipher_key = 9;
bool is_compressed = 10;
+ bool is_chunk_manifest = 11; // content is a list of FileChunks
+}
+
+message FileChunkManifest {
+ repeated FileChunk chunks = 1;
}
message FileId {