diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /other/java/client/src | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'other/java/client/src')
12 files changed, 359 insertions, 147 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index 58870d742..84b2beeb1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -15,7 +15,7 @@ public class ChunkCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) - .expireAfterAccess(1, TimeUnit.HOURS) + .expireAfterWrite(1, TimeUnit.HOURS) .build(); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index e70f6befa..10d263968 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -4,7 +4,6 @@ import com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -15,7 +14,11 @@ public class FilerClient extends FilerGrpcClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); public FilerClient(String host, int grpcPort) { - super(host, grpcPort); + super(host, grpcPort-10000, grpcPort); + } + + public FilerClient(String host, int port, int grpcPort) { + super(host, port, grpcPort); } public static String toFileId(FilerProto.FileId fid) { @@ -108,9 +111,9 @@ public class FilerClient extends FilerGrpcClient { if ("/".equals(path)) { return true; } - File pathFile = new File(path); - String parent = pathFile.getParent().replace('\\','/'); - String name = pathFile.getName(); + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String name = dirAndName[1]; mkdirs(parent, mode, uid, gid, userName, groupNames); @@ -129,35 +132,32 @@ public class FilerClient extends FilerGrpcClient { public boolean mv(String oldPath, String newPath) { - File oldPathFile = new File(oldPath); - String oldParent = oldPathFile.getParent().replace('\\','/'); - String oldName = oldPathFile.getName(); + String[] oldDirAndName = SeaweedUtil.toDirAndName(oldPath); + String oldParent = oldDirAndName[0]; + String oldName = oldDirAndName[1]; - File newPathFile = new File(newPath); - String newParent = newPathFile.getParent().replace('\\','/'); - String newName = newPathFile.getName(); + String[] newDirAndName = SeaweedUtil.toDirAndName(newPath); + String newParent = newDirAndName[0]; + String newName = newDirAndName[1]; return atomicRenameEntry(oldParent, oldName, newParent, newName); } public boolean exists(String path){ - File pathFile = new File(path); - String parent = pathFile.getParent(); - String entryName = pathFile.getName(); - if(parent == null) { - parent = path; - entryName =""; - } - return lookupEntry(parent, entryName) != null; + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String entryName = dirAndName[1]; + + return lookupEntry(parent, entryName) != null; } public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) { - File pathFile = new File(path); - String parent = pathFile.getParent().replace('\\','/'); - String name = pathFile.getName(); + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String name = dirAndName[1]; return deleteEntry( parent, @@ -168,17 +168,19 @@ public class FilerClient extends FilerGrpcClient { } public boolean touch(String path, int mode) { + String currentUser = System.getProperty("user.name"); long now = System.currentTimeMillis() / 1000L; return touch(path, now, mode, 0, 0, currentUser, new String[]{}); + } public boolean touch(String path, long modifiedTimeSecond, int mode, int uid, int gid, String userName, String[] groupNames) { - File pathFile = new File(path); - String parent = pathFile.getParent().replace('\\','/'); - String name = pathFile.getName(); + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String name = dirAndName[1]; FilerProto.Entry entry = lookupEntry(parent, name); if (entry == null) { @@ -366,6 +368,7 @@ public class FilerClient extends FilerGrpcClient { .setPathPrefix(prefix) .setClientName(clientName) .setSinceNs(sinceNs) + .setClientId(this.randomClientId) .build() ); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 6c57e2e0d..0a2e6332e 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; import java.util.HashMap; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { @@ -30,6 +31,7 @@ public class FilerGrpcClient { public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); + protected int randomClientId; private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; @@ -40,11 +42,11 @@ public class FilerGrpcClient { private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; private String filerAddress; - public FilerGrpcClient(String host, int grpcPort) { - this(host, grpcPort, sslContext); + public FilerGrpcClient(String host, int port, int grpcPort) { + this(host, port, grpcPort, sslContext); } - public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) { + public FilerGrpcClient(String host, int port, int grpcPort, SslContext sslContext) { this(sslContext == null ? ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() @@ -54,7 +56,7 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); - filerAddress = String.format("%s:%d", host, grpcPort - 10000); + filerAddress = SeaweedUtil.joinHostPort(host, port); FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = this.getBlockingStub().getFilerConfiguration( @@ -62,6 +64,7 @@ public class FilerGrpcClient { cipher = filerConfigurationResponse.getCipher(); collection = filerConfigurationResponse.getCollection(); replication = filerConfigurationResponse.getReplication(); + randomClientId = new Random().nextInt(); } diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java new file mode 100644 index 000000000..2eba4f808 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java @@ -0,0 +1,109 @@ +package seaweedfs.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class ReadChunks { + + public static List<SeaweedRead.VisibleInterval> readResolvedChunks(List<FilerProto.FileChunk> chunkList) throws IOException { + List<Point> points = new ArrayList<>(chunkList.size() * 2); + for (FilerProto.FileChunk chunk : chunkList) { + points.add(new Point(chunk.getOffset(), chunk, true)); + points.add(new Point(chunk.getOffset() + chunk.getSize(), chunk, false)); + } + Collections.sort(points, new Comparator<Point>() { + @Override + public int compare(Point a, Point b) { + int x = (int) (a.x - b.x); + if (a.x != b.x) { + return (int) (a.x - b.x); + } + if (a.ts != b.ts) { + return (int) (a.ts - b.ts); + } + if (!a.isStart) { + return -1; + } + return 1; + } + }); + + long prevX = 0; + List<SeaweedRead.VisibleInterval> visibles = new ArrayList<>(); + ArrayList<Point> queue = new ArrayList<>(); + for (Point point : points) { + if (point.isStart) { + if (queue.size() > 0) { + int lastIndex = queue.size() - 1; + Point lastPoint = queue.get(lastIndex); + if (point.x != prevX && lastPoint.ts < point.ts) { + addToVisibles(visibles, prevX, lastPoint, point); + prevX = point.x; + } + } + // insert into queue + for (int i = queue.size(); i >= 0; i--) { + if (i == 0 || queue.get(i - 1).ts <= point.ts) { + if (i == queue.size()) { + prevX = point.x; + } + queue.add(i, point); + break; + } + } + } else { + int lastIndex = queue.size() - 1; + int index = lastIndex; + Point startPoint = null; + for (; index >= 0; index--) { + startPoint = queue.get(index); + if (startPoint.ts == point.ts) { + queue.remove(index); + break; + } + } + if (index == lastIndex && startPoint != null) { + addToVisibles(visibles, prevX, startPoint, point); + prevX = point.x; + } + } + } + + return visibles; + + } + + private static void addToVisibles(List<SeaweedRead.VisibleInterval> visibles, long prevX, Point startPoint, Point point) { + if (prevX < point.x) { + FilerProto.FileChunk chunk = startPoint.chunk; + visibles.add(new SeaweedRead.VisibleInterval( + prevX, + point.x, + chunk.getFileId(), + chunk.getMtime(), + prevX - chunk.getOffset(), + chunk.getOffset() == prevX && chunk.getSize() == prevX - startPoint.x, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed() + )); + } + } + + static class Point { + long x; + long ts; + FilerProto.FileChunk chunk; + boolean isStart; + + public Point(long x, FilerProto.FileChunk chunk, boolean isStart) { + this.x = x; + this.ts = chunk.getMtime(); + this.chunk = chunk; + this.isStart = isStart; + } + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java b/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java index 39c17644b..0d912272b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java +++ b/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java @@ -14,10 +14,10 @@ public class RemoteUtil { String dir = SeaweedOutputStream.getParentDirectory(fullpath); String name = SeaweedOutputStream.getFileName(fullpath); - final FilerProto.DownloadToLocalResponse downloadToLocalResponse = filerClient.getBlockingStub() - .downloadToLocal(FilerProto.DownloadToLocalRequest.newBuilder() + final FilerProto.CacheRemoteObjectToLocalClusterResponse response = filerClient.getBlockingStub() + .cacheRemoteObjectToLocalCluster(FilerProto.CacheRemoteObjectToLocalClusterRequest.newBuilder() .setDirectory(dir).setName(name).build()); - return downloadToLocalResponse.getEntry(); + return response.getEntry(); } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java index 8d0ebd755..979decb8d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java @@ -36,7 +36,7 @@ public class SeaweedCipher { byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length); byte[] iv = AES_cipherInstance.getIV(); - byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH]; + byte[] message = new byte[GCM_NONCE_LENGTH + length + GCM_TAG_LENGTH]; System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH); System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 011462a17..41033befb 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -226,96 +226,8 @@ public class SeaweedRead { chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList); - FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); - Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { - @Override - public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - // if just a.getMtime() - b.getMtime(), it will overflow! - if (a.getMtime() < b.getMtime()) { - return -1; - } else if (a.getMtime() > b.getMtime()) { - return 1; - } - return 0; - } - }); - - List<VisibleInterval> visibles = new ArrayList<>(); - for (FilerProto.FileChunk chunk : chunks) { - List<VisibleInterval> newVisibles = new ArrayList<>(); - visibles = mergeIntoVisibles(visibles, newVisibles, chunk); - } - - return visibles; - } - - private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles, - List<VisibleInterval> newVisibles, - FilerProto.FileChunk chunk) { - VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - 0, - true, - chunk.getCipherKey().toByteArray(), - chunk.getIsCompressed() - ); - - // easy cases to speed up - if (visibles.size() == 0) { - visibles.add(newV); - return visibles; - } - if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) { - visibles.add(newV); - return visibles; - } - - for (VisibleInterval v : visibles) { - if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { - newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - v.chunkOffset, - false, - v.cipherKey, - v.isCompressed - )); - } - long chunkStop = chunk.getOffset() + chunk.getSize(); - if (v.start < chunkStop && chunkStop < v.stop) { - newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - v.chunkOffset + (chunkStop - v.start), - false, - v.cipherKey, - v.isCompressed - )); - } - if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { - newVisibles.add(v); - } - } - newVisibles.add(newV); - - // keep everything sorted - for (int i = newVisibles.size() - 1; i >= 0; i--) { - if (i > 0 && newV.start < newVisibles.get(i - 1).start) { - newVisibles.set(i, newVisibles.get(i - 1)); - } else { - newVisibles.set(i, newV); - break; - } - } + return ReadChunks.readResolvedChunks(chunkList); - return newVisibles; } public static String parseVolumeId(String fileId) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java index c465d935f..027e49b96 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java @@ -27,4 +27,30 @@ public class SeaweedUtil { public static CloseableHttpClient getClosableHttpClient() { return httpClient; } + + public static String[] toDirAndName(String fullpath) { + if (fullpath == null) { + return new String[]{"/", ""}; + } + if (fullpath.endsWith("/")) { + fullpath = fullpath.substring(0, fullpath.length() - 1); + } + if (fullpath.length() == 0) { + return new String[]{"/", ""}; + } + int sep = fullpath.lastIndexOf("/"); + String parent = sep == 0 ? "/" : fullpath.substring(0, sep); + String name = fullpath.substring(sep + 1); + return new String[]{parent, name}; + } + + public static String joinHostPort(String host, int port) { + if (host.startsWith("[") && host.endsWith("]")) { + return host + ":" + port; + } + if (host.indexOf(':')>=0) { + return "[" + host + "]:" + port; + } + return host + ":" + port; + } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 3f9d79b99..1ee745ed0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -29,11 +29,31 @@ public class SeaweedWrite { final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { - FilerProto.FileChunk.Builder chunkBuilder = writeChunk( - replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); - synchronized (entry) { - entry.addChunks(chunkBuilder); + + IOException lastException = null; + for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { + try { + FilerProto.FileChunk.Builder chunkBuilder = writeChunk( + replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); + lastException = null; + synchronized (entry) { + entry.addChunks(chunkBuilder); + } + break; + } catch (IOException ioe) { + LOG.debug("writeData:{}", ioe); + lastException = ioe; + } + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + } + } + + if (lastException != null) { + throw lastException; } + } public static FilerProto.FileChunk.Builder writeChunk(final String replication, @@ -59,7 +79,7 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); + String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java index 86263fff9..6f3d9d8c1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java @@ -15,7 +15,7 @@ public class VolumeIdCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) - .expireAfterAccess(5, TimeUnit.MINUTES) + .expireAfterWrite(5, TimeUnit.MINUTES) .build(); } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index bb461936c..bd0932cb8 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -30,6 +30,8 @@ service SeaweedFiler { rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { } + rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) { + } rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) { } @@ -46,6 +48,9 @@ service SeaweedFiler { rpc Statistics (StatisticsRequest) returns (StatisticsResponse) { } + rpc Ping (PingRequest) returns (PingResponse) { + } + rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } @@ -67,7 +72,7 @@ service SeaweedFiler { rpc KvPut (KvPutRequest) returns (KvPutResponse) { } - rpc DownloadToLocal (DownloadToLocalRequest) returns (DownloadToLocalResponse) { + rpc CacheRemoteObjectToLocalCluster (CacheRemoteObjectToLocalClusterRequest) returns (CacheRemoteObjectToLocalClusterResponse) { } } @@ -112,6 +117,7 @@ message Entry { bytes content = 9; // if not empty, the file content RemoteEntry remote_entry = 10; + int64 quota = 11; // for bucket only. Positive/Negative means enabled/disabled. } message FullEntry { @@ -160,14 +166,13 @@ message FuseAttributes { uint32 gid = 5; int64 crtime = 6; // unix time in seconds string mime = 7; - string replication = 8; - string collection = 9; int32 ttl_sec = 10; string user_name = 11; // for hdfs repeated string group_name = 12; // for hdfs string symlink_target = 13; bytes md5 = 14; - string disk_type = 15; + uint32 rdev = 16; + uint64 inode = 17; } message CreateEntryRequest { @@ -176,6 +181,7 @@ message CreateEntryRequest { bool o_excl = 3; bool is_from_other_cluster = 4; repeated int32 signatures = 5; + bool skip_check_parent_directory = 6; } message CreateEntryResponse { @@ -225,6 +231,18 @@ message AtomicRenameEntryRequest { message AtomicRenameEntryResponse { } +message StreamRenameEntryRequest { + string old_directory = 1; + string old_name = 2; + string new_directory = 3; + string new_name = 4; + repeated int32 signatures = 5; +} +message StreamRenameEntryResponse { + string directory = 1; + EventNotification event_notification = 2; + int64 ts_ns = 3; +} message AssignVolumeRequest { int32 count = 1; string collection = 2; @@ -233,18 +251,18 @@ message AssignVolumeRequest { string data_center = 5; string path = 6; string rack = 7; + string data_node = 9; string disk_type = 8; } message AssignVolumeResponse { string file_id = 1; - string url = 2; - string public_url = 3; int32 count = 4; string auth = 5; string collection = 6; string replication = 7; string error = 8; + Location location = 9; } message LookupVolumeRequest { @@ -258,6 +276,7 @@ message Locations { message Location { string url = 1; string public_url = 2; + uint32 grpc_port = 3; } message LookupVolumeResponse { map<string, Locations> locations_map = 1; @@ -292,6 +311,16 @@ message StatisticsResponse { uint64 file_count = 6; } +message PingRequest { + string target = 1; // default to ping itself + string target_type = 2; +} +message PingResponse { + int64 start_time_ns = 1; + int64 remote_time_ns = 2; + int64 stop_time_ns = 3; +} + message GetFilerConfigurationRequest { } message GetFilerConfigurationResponse { @@ -306,6 +335,7 @@ message GetFilerConfigurationResponse { int32 metrics_interval_sec = 10; string version = 11; string cluster_id = 12; + string filer_group = 13; } message SubscribeMetadataRequest { @@ -313,6 +343,9 @@ message SubscribeMetadataRequest { string path_prefix = 2; int64 since_ns = 3; int32 signature = 4; + repeated string path_prefixes = 6; + int32 client_id = 7; + int64 until_ns = 8; } message SubscribeMetadataResponse { string directory = 1; @@ -381,6 +414,9 @@ message FilerConf { bool fsync = 6; uint32 volume_growth_count = 7; bool read_only = 8; + string data_center = 9; + string rack = 10; + string data_node = 11; } repeated PathConf locations = 2; } @@ -388,10 +424,10 @@ message FilerConf { ///////////////////////// // Remote Storage related ///////////////////////// -message DownloadToLocalRequest { +message CacheRemoteObjectToLocalClusterRequest { string directory = 1; string name = 2; } -message DownloadToLocalResponse { +message CacheRemoteObjectToLocalClusterResponse { Entry entry = 1; } diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java index 44b833c90..6ad9edb2c 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -6,6 +6,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Random; public class SeaweedReadTest { @@ -13,17 +14,17 @@ public class SeaweedReadTest { public void testNonOverlappingVisibleIntervals() throws IOException { List<FilerProto.FileChunk> chunks = new ArrayList<>(); chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("aaa") - .setOffset(0) - .setSize(100) - .setMtime(1000) - .build()); + .setFileId("aaa") + .setOffset(0) + .setSize(100) + .setMtime(1000) + .build()); chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("bbb") - .setOffset(100) - .setSize(133) - .setMtime(2000) - .build()); + .setFileId("bbb") + .setOffset(100) + .setSize(133) + .setMtime(2000) + .build()); List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks); for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { @@ -61,4 +62,106 @@ public class SeaweedReadTest { } + + @Test + public void testReadResolvedChunks() throws IOException { + List<FilerProto.FileChunk> chunks = new ArrayList<>(); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("a") + .setOffset(0) + .setSize(100) + .setMtime(1) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("b") + .setOffset(50) + .setSize(100) + .setMtime(2) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("c") + .setOffset(200) + .setSize(50) + .setMtime(3) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("d") + .setOffset(250) + .setSize(50) + .setMtime(4) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("e") + .setOffset(175) + .setSize(100) + .setMtime(5) + .build()); + + List<SeaweedRead.VisibleInterval> visibleIntervals = ReadChunks.readResolvedChunks(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + } + + Assert.assertEquals(4, visibleIntervals.size()); + + SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); + Assert.assertEquals(visibleInterval.start, 0); + Assert.assertEquals(visibleInterval.stop, 50); + Assert.assertEquals(visibleInterval.modifiedTime, 1); + Assert.assertEquals(visibleInterval.fileId, "a"); + + visibleInterval = visibleIntervals.get(1); + Assert.assertEquals(visibleInterval.start, 50); + Assert.assertEquals(visibleInterval.stop, 150); + Assert.assertEquals(visibleInterval.modifiedTime, 2); + Assert.assertEquals(visibleInterval.fileId, "b"); + + visibleInterval = visibleIntervals.get(2); + Assert.assertEquals(visibleInterval.start, 175); + Assert.assertEquals(visibleInterval.stop, 275); + Assert.assertEquals(visibleInterval.modifiedTime, 5); + Assert.assertEquals(visibleInterval.fileId, "e"); + + visibleInterval = visibleIntervals.get(3); + Assert.assertEquals(visibleInterval.start, 275); + Assert.assertEquals(visibleInterval.stop, 300); + Assert.assertEquals(visibleInterval.modifiedTime, 4); + Assert.assertEquals(visibleInterval.fileId, "d"); + + } + + + @Test + public void testRandomizedReadResolvedChunks() throws IOException { + Random random = new Random(); + int limit = 1024*1024; + long[] array = new long[limit]; + List<FilerProto.FileChunk> chunks = new ArrayList<>(); + for (long ts=0;ts<1024;ts++){ + int x = random.nextInt(limit); + int y = random.nextInt(limit); + int size = Math.min(Math.abs(x-y), 1024); + chunks.add(randomWrite(array, Math.min(x,y), size, ts)); + } + + List<SeaweedRead.VisibleInterval> visibleIntervals = ReadChunks.readResolvedChunks(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + for (int i = (int) visibleInterval.start; i<visibleInterval.stop; i++) { + Assert.assertEquals(array[i], visibleInterval.modifiedTime); + } + } + + } + private FilerProto.FileChunk randomWrite(long[] array, int start, int size, long ts) { + for (int i=start;i<start+size;i++) { + array[i] = ts; + } + return FilerProto.FileChunk.newBuilder() + .setFileId("") + .setOffset(start) + .setSize(size) + .setMtime(ts) + .build(); + } } |
