aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/pom.xml7
-rw-r--r--other/java/client/pom.xml.deploy2
-rw-r--r--other/java/client/pom_debug.xml2
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java1
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java15
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java25
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java36
-rw-r--r--other/java/client/src/main/proto/filer.proto20
8 files changed, 95 insertions, 13 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index efbf304c4..4bfc5ab8f 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.5.2</version>
+ <version>1.5.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -68,6 +68,11 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
</dependencies>
<distributionManagement>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
index 9efc21373..c3c960a28 100644
--- a/other/java/client/pom.xml.deploy
+++ b/other/java/client/pom.xml.deploy
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.5.2</version>
+ <version>1.5.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 04ff52730..acdf621a5 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.5.2</version>
+ <version>1.5.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
index 7afa2dca0..58870d742 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -15,7 +15,6 @@ public class ChunkCache {
}
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
- .weakValues()
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 035b2c852..7338d5bee 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -275,9 +275,9 @@ public class FilerClient {
try {
FilerProto.CreateEntryResponse createEntryResponse =
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
- .setDirectory(parent)
- .setEntry(entry)
- .build());
+ .setDirectory(parent)
+ .setEntry(entry)
+ .build());
if (Strings.isNullOrEmpty(createEntryResponse.getError())) {
return true;
}
@@ -333,4 +333,13 @@ public class FilerClient {
return true;
}
+ public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) {
+ return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
+ .setPathPrefix(prefix)
+ .setClientName(clientName)
+ .setSinceNs(sinceNs)
+ .build()
+ );
+ }
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index ab2407dec..2b530d2dd 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -19,6 +19,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
static ChunkCache chunkCache = new ChunkCache(4);
+ static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@@ -27,16 +28,28 @@ public class SeaweedRead {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
+ Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
+
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
for (ChunkView chunkView : chunkViews) {
String vid = parseVolumeId(chunkView.fileId);
- lookupRequest.addVolumeIds(vid);
+ FilerProto.Locations locations = volumeIdCache.getLocations(vid);
+ if (locations == null) {
+ lookupRequest.addVolumeIds(vid);
+ } else {
+ knownLocations.put(vid, locations);
+ }
}
- FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
- .getBlockingStub().lookupVolume(lookupRequest.build());
-
- Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ if (lookupRequest.getVolumeIdsCount() > 0) {
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) {
+ volumeIdCache.setLocations(entry.getKey(), entry.getValue());
+ knownLocations.put(entry.getKey(), entry.getValue());
+ }
+ }
//TODO parallel this
long readCount = 0;
@@ -50,7 +63,7 @@ public class SeaweedRead {
startOffset += gap;
}
- FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
+ FilerProto.Locations locations = knownLocations.get(parseVolumeId(chunkView.fileId));
if (locations == null || locations.getLocationsCount() == 0) {
LOG.error("failed to locate {}", chunkView.fileId);
// log here!
diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
new file mode 100644
index 000000000..fd2649cc2
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
@@ -0,0 +1,36 @@
+package seaweedfs.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.concurrent.TimeUnit;
+
+public class VolumeIdCache {
+
+ private Cache<String, FilerProto.Locations> cache = null;
+
+ public VolumeIdCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(maxEntries)
+ .expireAfterAccess(5, TimeUnit.MINUTES)
+ .build();
+ }
+
+ public FilerProto.Locations getLocations(String volumeId) {
+ if (this.cache == null) {
+ return null;
+ }
+ return this.cache.getIfPresent(volumeId);
+ }
+
+ public void setLocations(String volumeId, FilerProto.Locations locations) {
+ if (this.cache == null) {
+ return;
+ }
+ this.cache.put(volumeId, locations);
+ }
+
+}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 11c29e6ec..f61b62fa3 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -348,3 +348,23 @@ message KvPutRequest {
message KvPutResponse {
string error = 1;
}
+
+// path-based configurations
+message FilerConf {
+ int32 version = 1;
+ message PathConf {
+ string location_prefix = 1;
+ string collection = 2;
+ string replication = 3;
+ string ttl = 4;
+ enum DiskType {
+ NONE = 0;
+ HDD = 1;
+ SSD = 2;
+ }
+ DiskType disk_type = 5;
+ bool fsync = 6;
+ uint32 volume_growth_count = 7;
+ }
+ repeated PathConf locations = 2;
+}