From 13bfe5deefcb83a5a89f9f15e7a15c29f97b1730 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 16 Aug 2020 21:14:39 -0700 Subject: same logic for reading random access files from Go --- .../main/java/seaweedfs/client/SeaweedRead.java | 55 ++++++++++++++++++---- 1 file changed, 45 insertions(+), 10 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index cd2f55678..045751717 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -23,7 +23,7 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) throws IOException { + final int bufferLength, final long fileSize) throws IOException { List chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); @@ -42,6 +42,14 @@ public class SeaweedRead { long readCount = 0; int startOffset = bufferOffset; for (ChunkView chunkView : chunkViews) { + + if (startOffset < chunkView.logicOffset) { + long gap = chunkView.logicOffset - startOffset; + LOG.debug("zero [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); if (locations == null || locations.getLocationsCount() == 0) { LOG.error("failed to locate {}", chunkView.fileId); @@ -51,11 +59,22 @@ public class SeaweedRead { int len = readChunkView(position, buffer, startOffset, chunkView, locations); + LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); + readCount += len; startOffset += len; } + long limit = Math.min(bufferLength, fileSize); + + if (startOffset < limit) { + long gap = limit - startOffset; + LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + return readCount; } @@ -71,7 +90,7 @@ public class SeaweedRead { int len = (int) chunkView.size; LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); - System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); + System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len); return len; } @@ -93,7 +112,7 @@ public class SeaweedRead { Header contentEncodingHeader = entity.getContentEncoding(); if (contentEncodingHeader != null) { - HeaderElement[] encodings =contentEncodingHeader.getElements(); + HeaderElement[] encodings = contentEncodingHeader.getElements(); for (int i = 0; i < encodings.length; i++) { if (encodings[i].getName().equalsIgnoreCase("gzip")) { entity = new GzipDecompressingEntity(entity); @@ -134,18 +153,19 @@ public class SeaweedRead { long stop = offset + size; for (VisibleInterval chunk : visibleIntervals) { - if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + long chunkStart = Math.max(offset, chunk.start); + long chunkStop = Math.min(stop, chunk.stop); + if (chunkStart < chunkStop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, + chunkStart - chunk.start + chunk.chunkOffset, + chunkStop - chunkStart, + chunkStart, isFullChunk, chunk.cipherKey, chunk.isCompressed )); - offset = Math.min(chunk.stop, stop); } } return views; @@ -160,7 +180,13 @@ public class SeaweedRead { Arrays.sort(chunks, new Comparator() { @Override public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - return (int) (a.getMtime() - b.getMtime()); + // if just a.getMtime() - b.getMtime(), it will overflow! + if (a.getMtime() < b.getMtime()) { + return -1; + } else if (a.getMtime() > b.getMtime()) { + return 1; + } + return 0; } }); @@ -181,6 +207,7 @@ public class SeaweedRead { chunk.getOffset() + chunk.getSize(), chunk.getFileId(), chunk.getMtime(), + 0, true, chunk.getCipherKey().toByteArray(), chunk.getIsCompressed() @@ -203,6 +230,7 @@ public class SeaweedRead { chunk.getOffset(), v.fileId, v.modifiedTime, + v.chunkOffset, false, v.cipherKey, v.isCompressed @@ -215,6 +243,7 @@ public class SeaweedRead { v.stop, v.fileId, v.modifiedTime, + v.chunkOffset + (chunkStop - v.start), false, v.cipherKey, v.isCompressed @@ -247,6 +276,10 @@ public class SeaweedRead { return fileId; } + public static long fileSize(FilerProto.Entry entry) { + return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize()); + } + public static long totalSize(List chunksList) { long size = 0; for (FilerProto.FileChunk chunk : chunksList) { @@ -263,15 +296,17 @@ public class SeaweedRead { public final long stop; public final long modifiedTime; public final String fileId; + public final long chunkOffset; public final boolean isFullChunk; public final byte[] cipherKey; public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; + this.chunkOffset = chunkOffset; this.isFullChunk = isFullChunk; this.cipherKey = cipherKey; this.isCompressed = isCompressed; -- cgit v1.2.3 From ca658a97c5248ba099356b006f0b341af53b0816 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 28 Aug 2020 23:48:48 -0700 Subject: add signatures to messages to avoid double processing --- other/java/client/src/main/proto/filer.proto | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index dcc18f2a5..65947e674 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -102,6 +102,7 @@ message EventNotification { bool delete_chunks = 3; string new_parent_path = 4; bool is_from_other_cluster = 5; + repeated int32 signatures = 6; } message FileChunk { @@ -150,6 +151,7 @@ message CreateEntryRequest { Entry entry = 2; bool o_excl = 3; bool is_from_other_cluster = 4; + repeated int32 signatures = 5; } message CreateEntryResponse { @@ -160,6 +162,7 @@ message UpdateEntryRequest { string directory = 1; Entry entry = 2; bool is_from_other_cluster = 3; + repeated int32 signatures = 4; } message UpdateEntryResponse { } @@ -180,6 +183,7 @@ message DeleteEntryRequest { bool is_recursive = 5; bool ignore_recursive_error = 6; bool is_from_other_cluster = 7; + repeated int32 signatures = 8; } message DeleteEntryResponse { @@ -268,6 +272,7 @@ message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; + int32 signature = 4; } message SubscribeMetadataResponse { string directory = 1; -- cgit v1.2.3 From bba90ff3c822914a8a2da4369e65756ff366cef2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 5 Sep 2020 22:52:15 -0700 Subject: read filer signature --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 65947e674..4d3924bf5 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -266,6 +266,7 @@ message GetFilerConfigurationResponse { uint32 max_mb = 4; string dir_buckets = 5; bool cipher = 7; + int32 signature = 8; } message SubscribeMetadataRequest { -- cgit v1.2.3 From 387ab6796f274151f802ccdab8756b959b5fb1cb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 9 Sep 2020 11:21:23 -0700 Subject: filer: cross cluster synchronization --- other/java/client/src/main/proto/filer.proto | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 4d3924bf5..cf88065ef 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -58,6 +58,12 @@ service SeaweedFiler { rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { } + rpc KvGet (KvGetRequest) returns (KvGetResponse) { + } + + rpc KvPut (KvPutRequest) returns (KvPutResponse) { + } + } ////////////////////////////////////////////////// @@ -308,3 +314,19 @@ message LocateBrokerResponse { } repeated Resource resources = 2; } + +// Key-Value operations +message KvGetRequest { + bytes key = 1; +} +message KvGetResponse { + bytes value = 1; + string error = 2; +} +message KvPutRequest { + bytes key = 1; + bytes value = 2; +} +message KvPutResponse { + string error = 1; +} -- cgit v1.2.3 From cb427d48fa42241b9396fa850e4bf02ecddc21ed Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 17 Sep 2020 06:46:51 -0700 Subject: filer report metrics configuration --- other/java/client/src/main/proto/filer.proto | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index cf88065ef..9a72bc976 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -273,6 +273,8 @@ message GetFilerConfigurationResponse { string dir_buckets = 5; bool cipher = 7; int32 signature = 8; + string metrics_address = 9; + int32 metrics_interval_sec = 10; } message SubscribeMetadataRequest { -- cgit v1.2.3 From 5e239afdfc64ef39c5d4f41ec16410e726614eee Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 24 Sep 2020 03:06:44 -0700 Subject: hardlink works now --- other/java/client/src/main/proto/filer.proto | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 9a72bc976..f0334a377 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -95,6 +95,8 @@ message Entry { repeated FileChunk chunks = 3; FuseAttributes attributes = 4; map extended = 5; + int64 hard_link_id = 7; + int32 hard_link_counter = 8; // only exists in hard link meta data } message FullEntry { -- cgit v1.2.3 From 1012df7bb55341fb7d835269cf9bb7edc6507d25 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 24 Sep 2020 11:11:42 -0700 Subject: switch hardlink id from int64 to bytes --- other/java/client/src/main/proto/filer.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index f0334a377..daa20c378 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -95,7 +95,7 @@ message Entry { repeated FileChunk chunks = 3; FuseAttributes attributes = 4; map extended = 5; - int64 hard_link_id = 7; + bytes hard_link_id = 7; int32 hard_link_counter = 8; // only exists in hard link meta data } -- cgit v1.2.3