diff options
Diffstat (limited to 'other/java')
25 files changed, 395 insertions, 185 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 70c5dbd31..33596f2a8 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.6.7</version> + <version>2.85</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -14,7 +14,7 @@ </parent> <properties> - <protobuf.version>3.9.1</protobuf.version> + <protobuf.version>3.16.1</protobuf.version> <!-- follow https://github.com/grpc/grpc-java --> <grpc.version>1.23.0</grpc.version> <guava.version>30.0-jre</guava.version> @@ -60,7 +60,7 @@ <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> - <version>4.5.6</version> + <version>4.5.13</version> </dependency> <dependency> <groupId>junit</groupId> @@ -135,7 +135,7 @@ <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>nexus-staging-maven-plugin</artifactId> - <version>1.6.7</version> + <version>1.6.8</version> <extensions>true</extensions> <configuration> <serverId>ossrh</serverId> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 82cf5e82b..865467cdc 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.6.7</version> + <version>2.85</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -60,7 +60,7 @@ <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> - <version>4.5.6</version> + <version>4.5.13</version> </dependency> <dependency> <groupId>junit</groupId> @@ -130,7 +130,7 @@ <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>nexus-staging-maven-plugin</artifactId> - <version>1.6.7</version> + <version>1.6.8</version> <extensions>true</extensions> <configuration> <serverId>ossrh</serverId> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index c72c81ab7..29c8c459d 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.6.7</version> + <version>2.85</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -60,7 +60,7 @@ <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> - <version>4.5.6</version> + <version>4.5.13</version> </dependency> <dependency> <groupId>junit</groupId> diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index 58870d742..84b2beeb1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -15,7 +15,7 @@ public class ChunkCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) - .expireAfterAccess(1, TimeUnit.HOURS) + .expireAfterWrite(1, TimeUnit.HOURS) .build(); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index e70f6befa..10d263968 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -4,7 +4,6 @@ import com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -15,7 +14,11 @@ public class FilerClient extends FilerGrpcClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); public FilerClient(String host, int grpcPort) { - super(host, grpcPort); + super(host, grpcPort-10000, grpcPort); + } + + public FilerClient(String host, int port, int grpcPort) { + super(host, port, grpcPort); } public static String toFileId(FilerProto.FileId fid) { @@ -108,9 +111,9 @@ public class FilerClient extends FilerGrpcClient { if ("/".equals(path)) { return true; } - File pathFile = new File(path); - String parent = pathFile.getParent().replace('\\','/'); - String name = pathFile.getName(); + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String name = dirAndName[1]; mkdirs(parent, mode, uid, gid, userName, groupNames); @@ -129,35 +132,32 @@ public class FilerClient extends FilerGrpcClient { public boolean mv(String oldPath, String newPath) { - File oldPathFile = new File(oldPath); - String oldParent = oldPathFile.getParent().replace('\\','/'); - String oldName = oldPathFile.getName(); + String[] oldDirAndName = SeaweedUtil.toDirAndName(oldPath); + String oldParent = oldDirAndName[0]; + String oldName = oldDirAndName[1]; - File newPathFile = new File(newPath); - String newParent = newPathFile.getParent().replace('\\','/'); - String newName = newPathFile.getName(); + String[] newDirAndName = SeaweedUtil.toDirAndName(newPath); + String newParent = newDirAndName[0]; + String newName = newDirAndName[1]; return atomicRenameEntry(oldParent, oldName, newParent, newName); } public boolean exists(String path){ - File pathFile = new File(path); - String parent = pathFile.getParent(); - String entryName = pathFile.getName(); - if(parent == null) { - parent = path; - entryName =""; - } - return lookupEntry(parent, entryName) != null; + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String entryName = dirAndName[1]; + + return lookupEntry(parent, entryName) != null; } public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) { - File pathFile = new File(path); - String parent = pathFile.getParent().replace('\\','/'); - String name = pathFile.getName(); + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String name = dirAndName[1]; return deleteEntry( parent, @@ -168,17 +168,19 @@ public class FilerClient extends FilerGrpcClient { } public boolean touch(String path, int mode) { + String currentUser = System.getProperty("user.name"); long now = System.currentTimeMillis() / 1000L; return touch(path, now, mode, 0, 0, currentUser, new String[]{}); + } public boolean touch(String path, long modifiedTimeSecond, int mode, int uid, int gid, String userName, String[] groupNames) { - File pathFile = new File(path); - String parent = pathFile.getParent().replace('\\','/'); - String name = pathFile.getName(); + String[] dirAndName = SeaweedUtil.toDirAndName(path); + String parent = dirAndName[0]; + String name = dirAndName[1]; FilerProto.Entry entry = lookupEntry(parent, name); if (entry == null) { @@ -366,6 +368,7 @@ public class FilerClient extends FilerGrpcClient { .setPathPrefix(prefix) .setClientName(clientName) .setSinceNs(sinceNs) + .setClientId(this.randomClientId) .build() ); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 6c57e2e0d..0a2e6332e 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; import java.util.HashMap; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { @@ -30,6 +31,7 @@ public class FilerGrpcClient { public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); + protected int randomClientId; private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; @@ -40,11 +42,11 @@ public class FilerGrpcClient { private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; private String filerAddress; - public FilerGrpcClient(String host, int grpcPort) { - this(host, grpcPort, sslContext); + public FilerGrpcClient(String host, int port, int grpcPort) { + this(host, port, grpcPort, sslContext); } - public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) { + public FilerGrpcClient(String host, int port, int grpcPort, SslContext sslContext) { this(sslContext == null ? ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() @@ -54,7 +56,7 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); - filerAddress = String.format("%s:%d", host, grpcPort - 10000); + filerAddress = SeaweedUtil.joinHostPort(host, port); FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = this.getBlockingStub().getFilerConfiguration( @@ -62,6 +64,7 @@ public class FilerGrpcClient { cipher = filerConfigurationResponse.getCipher(); collection = filerConfigurationResponse.getCollection(); replication = filerConfigurationResponse.getReplication(); + randomClientId = new Random().nextInt(); } diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java new file mode 100644 index 000000000..2eba4f808 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java @@ -0,0 +1,109 @@ +package seaweedfs.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class ReadChunks { + + public static List<SeaweedRead.VisibleInterval> readResolvedChunks(List<FilerProto.FileChunk> chunkList) throws IOException { + List<Point> points = new ArrayList<>(chunkList.size() * 2); + for (FilerProto.FileChunk chunk : chunkList) { + points.add(new Point(chunk.getOffset(), chunk, true)); + points.add(new Point(chunk.getOffset() + chunk.getSize(), chunk, false)); + } + Collections.sort(points, new Comparator<Point>() { + @Override + public int compare(Point a, Point b) { + int x = (int) (a.x - b.x); + if (a.x != b.x) { + return (int) (a.x - b.x); + } + if (a.ts != b.ts) { + return (int) (a.ts - b.ts); + } + if (!a.isStart) { + return -1; + } + return 1; + } + }); + + long prevX = 0; + List<SeaweedRead.VisibleInterval> visibles = new ArrayList<>(); + ArrayList<Point> queue = new ArrayList<>(); + for (Point point : points) { + if (point.isStart) { + if (queue.size() > 0) { + int lastIndex = queue.size() - 1; + Point lastPoint = queue.get(lastIndex); + if (point.x != prevX && lastPoint.ts < point.ts) { + addToVisibles(visibles, prevX, lastPoint, point); + prevX = point.x; + } + } + // insert into queue + for (int i = queue.size(); i >= 0; i--) { + if (i == 0 || queue.get(i - 1).ts <= point.ts) { + if (i == queue.size()) { + prevX = point.x; + } + queue.add(i, point); + break; + } + } + } else { + int lastIndex = queue.size() - 1; + int index = lastIndex; + Point startPoint = null; + for (; index >= 0; index--) { + startPoint = queue.get(index); + if (startPoint.ts == point.ts) { + queue.remove(index); + break; + } + } + if (index == lastIndex && startPoint != null) { + addToVisibles(visibles, prevX, startPoint, point); + prevX = point.x; + } + } + } + + return visibles; + + } + + private static void addToVisibles(List<SeaweedRead.VisibleInterval> visibles, long prevX, Point startPoint, Point point) { + if (prevX < point.x) { + FilerProto.FileChunk chunk = startPoint.chunk; + visibles.add(new SeaweedRead.VisibleInterval( + prevX, + point.x, + chunk.getFileId(), + chunk.getMtime(), + prevX - chunk.getOffset(), + chunk.getOffset() == prevX && chunk.getSize() == prevX - startPoint.x, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed() + )); + } + } + + static class Point { + long x; + long ts; + FilerProto.FileChunk chunk; + boolean isStart; + + public Point(long x, FilerProto.FileChunk chunk, boolean isStart) { + this.x = x; + this.ts = chunk.getMtime(); + this.chunk = chunk; + this.isStart = isStart; + } + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java b/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java index 39c17644b..0d912272b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java +++ b/other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java @@ -14,10 +14,10 @@ public class RemoteUtil { String dir = SeaweedOutputStream.getParentDirectory(fullpath); String name = SeaweedOutputStream.getFileName(fullpath); - final FilerProto.DownloadToLocalResponse downloadToLocalResponse = filerClient.getBlockingStub() - .downloadToLocal(FilerProto.DownloadToLocalRequest.newBuilder() + final FilerProto.CacheRemoteObjectToLocalClusterResponse response = filerClient.getBlockingStub() + .cacheRemoteObjectToLocalCluster(FilerProto.CacheRemoteObjectToLocalClusterRequest.newBuilder() .setDirectory(dir).setName(name).build()); - return downloadToLocalResponse.getEntry(); + return response.getEntry(); } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java index 8d0ebd755..979decb8d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java @@ -36,7 +36,7 @@ public class SeaweedCipher { byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length); byte[] iv = AES_cipherInstance.getIV(); - byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH]; + byte[] message = new byte[GCM_NONCE_LENGTH + length + GCM_TAG_LENGTH]; System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH); System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 011462a17..41033befb 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -226,96 +226,8 @@ public class SeaweedRead { chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList); - FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); - Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { - @Override - public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - // if just a.getMtime() - b.getMtime(), it will overflow! - if (a.getMtime() < b.getMtime()) { - return -1; - } else if (a.getMtime() > b.getMtime()) { - return 1; - } - return 0; - } - }); - - List<VisibleInterval> visibles = new ArrayList<>(); - for (FilerProto.FileChunk chunk : chunks) { - List<VisibleInterval> newVisibles = new ArrayList<>(); - visibles = mergeIntoVisibles(visibles, newVisibles, chunk); - } - - return visibles; - } - - private static List<VisibleInterval> mergeIntoVisibles(List<VisibleInterval> visibles, - List<VisibleInterval> newVisibles, - FilerProto.FileChunk chunk) { - VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - 0, - true, - chunk.getCipherKey().toByteArray(), - chunk.getIsCompressed() - ); - - // easy cases to speed up - if (visibles.size() == 0) { - visibles.add(newV); - return visibles; - } - if (visibles.get(visibles.size() - 1).stop <= chunk.getOffset()) { - visibles.add(newV); - return visibles; - } - - for (VisibleInterval v : visibles) { - if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { - newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - v.chunkOffset, - false, - v.cipherKey, - v.isCompressed - )); - } - long chunkStop = chunk.getOffset() + chunk.getSize(); - if (v.start < chunkStop && chunkStop < v.stop) { - newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - v.chunkOffset + (chunkStop - v.start), - false, - v.cipherKey, - v.isCompressed - )); - } - if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { - newVisibles.add(v); - } - } - newVisibles.add(newV); - - // keep everything sorted - for (int i = newVisibles.size() - 1; i >= 0; i--) { - if (i > 0 && newV.start < newVisibles.get(i - 1).start) { - newVisibles.set(i, newVisibles.get(i - 1)); - } else { - newVisibles.set(i, newV); - break; - } - } + return ReadChunks.readResolvedChunks(chunkList); - return newVisibles; } public static String parseVolumeId(String fileId) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java index c465d935f..027e49b96 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java @@ -27,4 +27,30 @@ public class SeaweedUtil { public static CloseableHttpClient getClosableHttpClient() { return httpClient; } + + public static String[] toDirAndName(String fullpath) { + if (fullpath == null) { + return new String[]{"/", ""}; + } + if (fullpath.endsWith("/")) { + fullpath = fullpath.substring(0, fullpath.length() - 1); + } + if (fullpath.length() == 0) { + return new String[]{"/", ""}; + } + int sep = fullpath.lastIndexOf("/"); + String parent = sep == 0 ? "/" : fullpath.substring(0, sep); + String name = fullpath.substring(sep + 1); + return new String[]{parent, name}; + } + + public static String joinHostPort(String host, int port) { + if (host.startsWith("[") && host.endsWith("]")) { + return host + ":" + port; + } + if (host.indexOf(':')>=0) { + return "[" + host + "]:" + port; + } + return host + ":" + port; + } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 3f9d79b99..1ee745ed0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -29,11 +29,31 @@ public class SeaweedWrite { final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { - FilerProto.FileChunk.Builder chunkBuilder = writeChunk( - replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); - synchronized (entry) { - entry.addChunks(chunkBuilder); + + IOException lastException = null; + for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { + try { + FilerProto.FileChunk.Builder chunkBuilder = writeChunk( + replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); + lastException = null; + synchronized (entry) { + entry.addChunks(chunkBuilder); + } + break; + } catch (IOException ioe) { + LOG.debug("writeData:{}", ioe); + lastException = ioe; + } + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + } + } + + if (lastException != null) { + throw lastException; } + } public static FilerProto.FileChunk.Builder writeChunk(final String replication, @@ -59,7 +79,7 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); + String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java index 86263fff9..6f3d9d8c1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java @@ -15,7 +15,7 @@ public class VolumeIdCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) - .expireAfterAccess(5, TimeUnit.MINUTES) + .expireAfterWrite(5, TimeUnit.MINUTES) .build(); } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index bb461936c..bd0932cb8 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -30,6 +30,8 @@ service SeaweedFiler { rpc AtomicRenameEntry (AtomicRenameEntryRequest) returns (AtomicRenameEntryResponse) { } + rpc StreamRenameEntry (StreamRenameEntryRequest) returns (stream StreamRenameEntryResponse) { + } rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) { } @@ -46,6 +48,9 @@ service SeaweedFiler { rpc Statistics (StatisticsRequest) returns (StatisticsResponse) { } + rpc Ping (PingRequest) returns (PingResponse) { + } + rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } @@ -67,7 +72,7 @@ service SeaweedFiler { rpc KvPut (KvPutRequest) returns (KvPutResponse) { } - rpc DownloadToLocal (DownloadToLocalRequest) returns (DownloadToLocalResponse) { + rpc CacheRemoteObjectToLocalCluster (CacheRemoteObjectToLocalClusterRequest) returns (CacheRemoteObjectToLocalClusterResponse) { } } @@ -112,6 +117,7 @@ message Entry { bytes content = 9; // if not empty, the file content RemoteEntry remote_entry = 10; + int64 quota = 11; // for bucket only. Positive/Negative means enabled/disabled. } message FullEntry { @@ -160,14 +166,13 @@ message FuseAttributes { uint32 gid = 5; int64 crtime = 6; // unix time in seconds string mime = 7; - string replication = 8; - string collection = 9; int32 ttl_sec = 10; string user_name = 11; // for hdfs repeated string group_name = 12; // for hdfs string symlink_target = 13; bytes md5 = 14; - string disk_type = 15; + uint32 rdev = 16; + uint64 inode = 17; } message CreateEntryRequest { @@ -176,6 +181,7 @@ message CreateEntryRequest { bool o_excl = 3; bool is_from_other_cluster = 4; repeated int32 signatures = 5; + bool skip_check_parent_directory = 6; } message CreateEntryResponse { @@ -225,6 +231,18 @@ message AtomicRenameEntryRequest { message AtomicRenameEntryResponse { } +message StreamRenameEntryRequest { + string old_directory = 1; + string old_name = 2; + string new_directory = 3; + string new_name = 4; + repeated int32 signatures = 5; +} +message StreamRenameEntryResponse { + string directory = 1; + EventNotification event_notification = 2; + int64 ts_ns = 3; +} message AssignVolumeRequest { int32 count = 1; string collection = 2; @@ -233,18 +251,18 @@ message AssignVolumeRequest { string data_center = 5; string path = 6; string rack = 7; + string data_node = 9; string disk_type = 8; } message AssignVolumeResponse { string file_id = 1; - string url = 2; - string public_url = 3; int32 count = 4; string auth = 5; string collection = 6; string replication = 7; string error = 8; + Location location = 9; } message LookupVolumeRequest { @@ -258,6 +276,7 @@ message Locations { message Location { string url = 1; string public_url = 2; + uint32 grpc_port = 3; } message LookupVolumeResponse { map<string, Locations> locations_map = 1; @@ -292,6 +311,16 @@ message StatisticsResponse { uint64 file_count = 6; } +message PingRequest { + string target = 1; // default to ping itself + string target_type = 2; +} +message PingResponse { + int64 start_time_ns = 1; + int64 remote_time_ns = 2; + int64 stop_time_ns = 3; +} + message GetFilerConfigurationRequest { } message GetFilerConfigurationResponse { @@ -306,6 +335,7 @@ message GetFilerConfigurationResponse { int32 metrics_interval_sec = 10; string version = 11; string cluster_id = 12; + string filer_group = 13; } message SubscribeMetadataRequest { @@ -313,6 +343,9 @@ message SubscribeMetadataRequest { string path_prefix = 2; int64 since_ns = 3; int32 signature = 4; + repeated string path_prefixes = 6; + int32 client_id = 7; + int64 until_ns = 8; } message SubscribeMetadataResponse { string directory = 1; @@ -381,6 +414,9 @@ message FilerConf { bool fsync = 6; uint32 volume_growth_count = 7; bool read_only = 8; + string data_center = 9; + string rack = 10; + string data_node = 11; } repeated PathConf locations = 2; } @@ -388,10 +424,10 @@ message FilerConf { ///////////////////////// // Remote Storage related ///////////////////////// -message DownloadToLocalRequest { +message CacheRemoteObjectToLocalClusterRequest { string directory = 1; string name = 2; } -message DownloadToLocalResponse { +message CacheRemoteObjectToLocalClusterResponse { Entry entry = 1; } diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java index 44b833c90..6ad9edb2c 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -6,6 +6,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Random; public class SeaweedReadTest { @@ -13,17 +14,17 @@ public class SeaweedReadTest { public void testNonOverlappingVisibleIntervals() throws IOException { List<FilerProto.FileChunk> chunks = new ArrayList<>(); chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("aaa") - .setOffset(0) - .setSize(100) - .setMtime(1000) - .build()); + .setFileId("aaa") + .setOffset(0) + .setSize(100) + .setMtime(1000) + .build()); chunks.add(FilerProto.FileChunk.newBuilder() - .setFileId("bbb") - .setOffset(100) - .setSize(133) - .setMtime(2000) - .build()); + .setFileId("bbb") + .setOffset(100) + .setSize(133) + .setMtime(2000) + .build()); List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks); for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { @@ -61,4 +62,106 @@ public class SeaweedReadTest { } + + @Test + public void testReadResolvedChunks() throws IOException { + List<FilerProto.FileChunk> chunks = new ArrayList<>(); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("a") + .setOffset(0) + .setSize(100) + .setMtime(1) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("b") + .setOffset(50) + .setSize(100) + .setMtime(2) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("c") + .setOffset(200) + .setSize(50) + .setMtime(3) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("d") + .setOffset(250) + .setSize(50) + .setMtime(4) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("e") + .setOffset(175) + .setSize(100) + .setMtime(5) + .build()); + + List<SeaweedRead.VisibleInterval> visibleIntervals = ReadChunks.readResolvedChunks(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + } + + Assert.assertEquals(4, visibleIntervals.size()); + + SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); + Assert.assertEquals(visibleInterval.start, 0); + Assert.assertEquals(visibleInterval.stop, 50); + Assert.assertEquals(visibleInterval.modifiedTime, 1); + Assert.assertEquals(visibleInterval.fileId, "a"); + + visibleInterval = visibleIntervals.get(1); + Assert.assertEquals(visibleInterval.start, 50); + Assert.assertEquals(visibleInterval.stop, 150); + Assert.assertEquals(visibleInterval.modifiedTime, 2); + Assert.assertEquals(visibleInterval.fileId, "b"); + + visibleInterval = visibleIntervals.get(2); + Assert.assertEquals(visibleInterval.start, 175); + Assert.assertEquals(visibleInterval.stop, 275); + Assert.assertEquals(visibleInterval.modifiedTime, 5); + Assert.assertEquals(visibleInterval.fileId, "e"); + + visibleInterval = visibleIntervals.get(3); + Assert.assertEquals(visibleInterval.start, 275); + Assert.assertEquals(visibleInterval.stop, 300); + Assert.assertEquals(visibleInterval.modifiedTime, 4); + Assert.assertEquals(visibleInterval.fileId, "d"); + + } + + + @Test + public void testRandomizedReadResolvedChunks() throws IOException { + Random random = new Random(); + int limit = 1024*1024; + long[] array = new long[limit]; + List<FilerProto.FileChunk> chunks = new ArrayList<>(); + for (long ts=0;ts<1024;ts++){ + int x = random.nextInt(limit); + int y = random.nextInt(limit); + int size = Math.min(Math.abs(x-y), 1024); + chunks.add(randomWrite(array, Math.min(x,y), size, ts)); + } + + List<SeaweedRead.VisibleInterval> visibleIntervals = ReadChunks.readResolvedChunks(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + for (int i = (int) visibleInterval.start; i<visibleInterval.stop; i++) { + Assert.assertEquals(array[i], visibleInterval.modifiedTime); + } + } + + } + private FilerProto.FileChunk randomWrite(long[] array, int start, int size, long ts) { + for (int i=start;i<start+size;i++) { + array[i] = ts; + } + return FilerProto.FileChunk.newBuilder() + .setFileId("") + .setOffset(start) + .setSize(size) + .setMtime(ts) + .build(); + } } diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml index 26c9bdfdc..c0927559a 100644 --- a/other/java/examples/pom.xml +++ b/other/java/examples/pom.xml @@ -11,22 +11,22 @@ <dependency> <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.6.7</version> + <version>2.85</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-hadoop2-client</artifactId> - <version>1.6.7</version> + <version>2.85</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>2.9.2</version> + <version>2.10.1</version> <scope>compile</scope> </dependency> </dependencies> -</project>
\ No newline at end of file +</project> diff --git a/other/java/hdfs-over-ftp/pom.xml b/other/java/hdfs-over-ftp/pom.xml index 0db422db5..8b4f0e612 100644 --- a/other/java/hdfs-over-ftp/pom.xml +++ b/other/java/hdfs-over-ftp/pom.xml @@ -36,7 +36,7 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>3.2.1</version> + <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index bd31637ce..74f8e6240 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -86,7 +86,7 @@ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
- <version>1.6.7</version>
+ <version>1.6.8</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
@@ -301,7 +301,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.6.7</seaweedfs.client.version>
+ <seaweedfs.client.version>2.85</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index f15d24faa..eccbb54bf 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,8 +5,8 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.6.7</seaweedfs.client.version> - <hadoop.version>2.9.2</hadoop.version> + <seaweedfs.client.version>2.85</seaweedfs.client.version> + <hadoop.version>2.10.1</hadoop.version> </properties> <groupId>com.github.chrislusf</groupId> @@ -105,7 +105,7 @@ <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>nexus-staging-maven-plugin</artifactId> - <version>1.6.7</version> + <version>1.6.8</version> <extensions>true</extensions> <configuration> <serverId>ossrh</serverId> diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 25395db7a..b6ea4c3bb 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -23,6 +23,7 @@ public class SeaweedFileSystem extends FileSystem { public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final String FS_SEAWEED_FILER_PORT_GRPC = "fs.seaweed.filer.port.grpc"; public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; @@ -50,9 +51,6 @@ public class SeaweedFileSystem extends FileSystem { // get host information from uri (overrides info in conf) String host = uri.getHost(); host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host; - if (host == null) { - throw new IOException("Invalid host specified"); - } conf.set(FS_SEAWEED_FILER_HOST, host); // get port information from uri, (overrides info in conf) @@ -60,10 +58,12 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); + int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000); + setConf(conf); this.uri = uri; - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, conf); + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, grpcPort, conf); } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index f4e8c9349..2ba8e1a10 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -27,9 +27,8 @@ public class SeaweedFileSystemStore { private FilerClient filerClient; private Configuration conf; - public SeaweedFileSystemStore(String host, int port, Configuration conf) { - int grpcPort = 10000 + port; - filerClient = new FilerClient(host, grpcPort); + public SeaweedFileSystemStore(String host, int port, int grpcPort, Configuration conf) { + filerClient = new FilerClient(host, port, grpcPort); this.conf = conf; String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); if (volumeServerAccessMode.equals("publicUrl")) { diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 4640b5a84..bbfd48ab9 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -86,7 +86,7 @@ <plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
- <version>1.6.7</version>
+ <version>1.6.8</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
@@ -309,7 +309,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.6.7</seaweedfs.client.version>
+ <seaweedfs.client.version>2.85</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index efcc1e4c0..f25ecf986 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,8 +5,8 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.6.7</seaweedfs.client.version> - <hadoop.version>3.1.1</hadoop.version> + <seaweedfs.client.version>2.85</seaweedfs.client.version> + <hadoop.version>3.2.3</hadoop.version> </properties> <groupId>com.github.chrislusf</groupId> @@ -105,7 +105,7 @@ <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>nexus-staging-maven-plugin</artifactId> - <version>1.6.7</version> + <version>1.6.8</version> <extensions>true</extensions> <configuration> <serverId>ossrh</serverId> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 25395db7a..b6ea4c3bb 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -23,6 +23,7 @@ public class SeaweedFileSystem extends FileSystem { public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final String FS_SEAWEED_FILER_PORT_GRPC = "fs.seaweed.filer.port.grpc"; public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; @@ -50,9 +51,6 @@ public class SeaweedFileSystem extends FileSystem { // get host information from uri (overrides info in conf) String host = uri.getHost(); host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host; - if (host == null) { - throw new IOException("Invalid host specified"); - } conf.set(FS_SEAWEED_FILER_HOST, host); // get port information from uri, (overrides info in conf) @@ -60,10 +58,12 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); + int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000); + setConf(conf); this.uri = uri; - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, conf); + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, grpcPort, conf); } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index f4e8c9349..2ba8e1a10 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -27,9 +27,8 @@ public class SeaweedFileSystemStore { private FilerClient filerClient; private Configuration conf; - public SeaweedFileSystemStore(String host, int port, Configuration conf) { - int grpcPort = 10000 + port; - filerClient = new FilerClient(host, grpcPort); + public SeaweedFileSystemStore(String host, int port, int grpcPort, Configuration conf) { + filerClient = new FilerClient(host, port, grpcPort); this.conf = conf; String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); if (volumeServerAccessMode.equals("publicUrl")) { |
