aboutsummaryrefslogtreecommitdiff
path: root/other/java/client/src
diff options
context:
space:
mode:
authorhasagi <30975629+LIBA-S@users.noreply.github.com>2020-09-22 21:38:38 +0800
committerGitHub <noreply@github.com>2020-09-22 21:38:38 +0800
commitd7bf2390e2bf4ac55132878faa68119b3558e8e4 (patch)
tree48ede45893c2130d3e039f7fe4af8440835eb02d /other/java/client/src
parent37e964d4bd60a9dd792a9cc24f05eaa05d3766f2 (diff)
parentec5b9f1e91a8609d0e70bf9d26dc0840774153c4 (diff)
downloadseaweedfs-d7bf2390e2bf4ac55132878faa68119b3558e8e4.tar.xz
seaweedfs-d7bf2390e2bf4ac55132878faa68119b3558e8e4.zip
Merge pull request #1 from chrislusf/master
catch up
Diffstat (limited to 'other/java/client/src')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java1
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java23
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java3
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/Gzip.java14
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java85
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java3
-rw-r--r--other/java/client/src/main/proto/filer.proto30
7 files changed, 125 insertions, 34 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
index 58870d742..7afa2dca0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -15,6 +15,7 @@ public class ChunkCache {
}
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
+ .weakValues()
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
index d8d29ede8..79e8d9bc4 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -6,7 +6,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
public class FileChunkManifest {
@@ -51,13 +50,17 @@ public class FileChunkManifest {
private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
- FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
String vid = "" + chunk.getFid().getVolumeId();
- lookupRequest.addVolumeIds(vid);
- FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
- .getBlockingStub().lookupVolume(lookupRequest.build());
- Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
- FilerProto.Locations locations = vid2Locations.get(vid);
+ FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid);
+ if (locations == null) {
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ lookupRequest.addVolumeIds(vid);
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ locations = lookupResponse.getLocationsMapMap().get(vid);
+ filerGrpcClient.vidLocations.put(vid, locations);
+ LOG.debug("fetchChunk vid:{} locations:{}", vid, locations);
+ }
SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
@@ -73,8 +76,10 @@ public class FileChunkManifest {
LOG.debug("doFetchFullChunkData:{}", chunkView);
chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
}
- LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
- SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+ if (chunk.getIsChunkManifest()){
+ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
+ SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+ }
return chunkData;
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 57b67f6b0..1a719f3c0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -9,6 +9,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
@@ -24,6 +26,7 @@ public class FilerGrpcClient {
}
}
+ public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
index 248285dd3..4909094f5 100644
--- a/other/java/client/src/main/java/seaweedfs/client/Gzip.java
+++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java
@@ -18,14 +18,18 @@ public class Gzip {
return compressed;
}
- public static byte[] decompress(byte[] compressed) throws IOException {
- ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
- GZIPInputStream gis = new GZIPInputStream(bis);
- return readAll(gis);
+ public static byte[] decompress(byte[] compressed) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ return readAll(gis);
+ } catch (Exception e) {
+ return compressed;
+ }
}
private static byte[] readAll(InputStream input) throws IOException {
- try( ByteArrayOutputStream output = new ByteArrayOutputStream()){
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
byte[] buffer = new byte[4096];
int n;
while (-1 != (n = input.read(buffer))) {
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index f0490540d..045751717 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -1,7 +1,10 @@
package seaweedfs.client;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
+import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.util.EntityUtils;
@@ -15,12 +18,12 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
- static ChunkCache chunkCache = new ChunkCache(0);
+ static ChunkCache chunkCache = new ChunkCache(4);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength) throws IOException {
+ final int bufferLength, final long fileSize) throws IOException {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
@@ -39,6 +42,14 @@ public class SeaweedRead {
long readCount = 0;
int startOffset = bufferOffset;
for (ChunkView chunkView : chunkViews) {
+
+ if (startOffset < chunkView.logicOffset) {
+ long gap = chunkView.logicOffset - startOffset;
+ LOG.debug("zero [{},{})", startOffset, startOffset + gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
if (locations == null || locations.getLocationsCount() == 0) {
LOG.error("failed to locate {}", chunkView.fileId);
@@ -48,11 +59,22 @@ public class SeaweedRead {
int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+ LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
+
readCount += len;
startOffset += len;
}
+ long limit = Math.min(bufferLength, fileSize);
+
+ if (startOffset < limit) {
+ long gap = limit - startOffset;
+ LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
return readCount;
}
@@ -62,14 +84,13 @@ public class SeaweedRead {
if (chunkData == null) {
chunkData = doFetchFullChunkData(chunkView, locations);
+ chunkCache.setChunk(chunkView.fileId, chunkData);
}
int len = (int) chunkView.size;
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
- System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
-
- chunkCache.setChunk(chunkView.fileId, chunkData);
+ System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len);
return len;
}
@@ -79,7 +100,7 @@ public class SeaweedRead {
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
- request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
+ request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
byte[] data = null;
@@ -88,6 +109,18 @@ public class SeaweedRead {
try {
HttpEntity entity = response.getEntity();
+ Header contentEncodingHeader = entity.getContentEncoding();
+
+ if (contentEncodingHeader != null) {
+ HeaderElement[] encodings = contentEncodingHeader.getElements();
+ for (int i = 0; i < encodings.length; i++) {
+ if (encodings[i].getName().equalsIgnoreCase("gzip")) {
+ entity = new GzipDecompressingEntity(entity);
+ break;
+ }
+ }
+ }
+
data = EntityUtils.toByteArray(entity);
EntityUtils.consume(entity);
@@ -97,10 +130,6 @@ public class SeaweedRead {
request.releaseConnection();
}
- if (chunkView.isCompressed) {
- data = Gzip.decompress(data);
- }
-
if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
try {
data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
@@ -109,6 +138,12 @@ public class SeaweedRead {
}
}
+ if (chunkView.isCompressed) {
+ data = Gzip.decompress(data);
+ }
+
+ LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
+
return data;
}
@@ -118,18 +153,19 @@ public class SeaweedRead {
long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) {
- if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
+ long chunkStart = Math.max(offset, chunk.start);
+ long chunkStop = Math.min(stop, chunk.stop);
+ if (chunkStart < chunkStop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
+ chunkStart - chunk.start + chunk.chunkOffset,
+ chunkStop - chunkStart,
+ chunkStart,
isFullChunk,
chunk.cipherKey,
chunk.isCompressed
));
- offset = Math.min(chunk.stop, stop);
}
}
return views;
@@ -144,7 +180,13 @@ public class SeaweedRead {
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- return (int) (a.getMtime() - b.getMtime());
+ // if just a.getMtime() - b.getMtime(), it will overflow!
+ if (a.getMtime() < b.getMtime()) {
+ return -1;
+ } else if (a.getMtime() > b.getMtime()) {
+ return 1;
+ }
+ return 0;
}
});
@@ -165,6 +207,7 @@ public class SeaweedRead {
chunk.getOffset() + chunk.getSize(),
chunk.getFileId(),
chunk.getMtime(),
+ 0,
true,
chunk.getCipherKey().toByteArray(),
chunk.getIsCompressed()
@@ -187,6 +230,7 @@ public class SeaweedRead {
chunk.getOffset(),
v.fileId,
v.modifiedTime,
+ v.chunkOffset,
false,
v.cipherKey,
v.isCompressed
@@ -199,6 +243,7 @@ public class SeaweedRead {
v.stop,
v.fileId,
v.modifiedTime,
+ v.chunkOffset + (chunkStop - v.start),
false,
v.cipherKey,
v.isCompressed
@@ -231,6 +276,10 @@ public class SeaweedRead {
return fileId;
}
+ public static long fileSize(FilerProto.Entry entry) {
+ return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ }
+
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
long size = 0;
for (FilerProto.FileChunk chunk : chunksList) {
@@ -247,15 +296,17 @@ public class SeaweedRead {
public final long stop;
public final long modifiedTime;
public final String fileId;
+ public final long chunkOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
+ this.chunkOffset = chunkOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
this.isCompressed = isCompressed;
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 5f4d888bd..d3cecab75 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -61,9 +61,6 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- // cache fileId ~ bytes
- SeaweedRead.chunkCache.setChunk(fileId, bytes);
-
LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
return FilerProto.FileChunk.newBuilder()
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index dcc18f2a5..9a72bc976 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -58,6 +58,12 @@ service SeaweedFiler {
rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
}
+ rpc KvGet (KvGetRequest) returns (KvGetResponse) {
+ }
+
+ rpc KvPut (KvPutRequest) returns (KvPutResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -102,6 +108,7 @@ message EventNotification {
bool delete_chunks = 3;
string new_parent_path = 4;
bool is_from_other_cluster = 5;
+ repeated int32 signatures = 6;
}
message FileChunk {
@@ -150,6 +157,7 @@ message CreateEntryRequest {
Entry entry = 2;
bool o_excl = 3;
bool is_from_other_cluster = 4;
+ repeated int32 signatures = 5;
}
message CreateEntryResponse {
@@ -160,6 +168,7 @@ message UpdateEntryRequest {
string directory = 1;
Entry entry = 2;
bool is_from_other_cluster = 3;
+ repeated int32 signatures = 4;
}
message UpdateEntryResponse {
}
@@ -180,6 +189,7 @@ message DeleteEntryRequest {
bool is_recursive = 5;
bool ignore_recursive_error = 6;
bool is_from_other_cluster = 7;
+ repeated int32 signatures = 8;
}
message DeleteEntryResponse {
@@ -262,12 +272,16 @@ message GetFilerConfigurationResponse {
uint32 max_mb = 4;
string dir_buckets = 5;
bool cipher = 7;
+ int32 signature = 8;
+ string metrics_address = 9;
+ int32 metrics_interval_sec = 10;
}
message SubscribeMetadataRequest {
string client_name = 1;
string path_prefix = 2;
int64 since_ns = 3;
+ int32 signature = 4;
}
message SubscribeMetadataResponse {
string directory = 1;
@@ -302,3 +316,19 @@ message LocateBrokerResponse {
}
repeated Resource resources = 2;
}
+
+// Key-Value operations
+message KvGetRequest {
+ bytes key = 1;
+}
+message KvGetResponse {
+ bytes value = 1;
+ string error = 2;
+}
+message KvPutRequest {
+ bytes key = 1;
+ bytes value = 2;
+}
+message KvPutResponse {
+ string error = 1;
+}