diff options
Diffstat (limited to 'other/java/client')
| -rw-r--r-- | other/java/client/pom.xml | 7 | ||||
| -rw-r--r-- | other/java/client/pom.xml.deploy | 2 | ||||
| -rw-r--r-- | other/java/client/pom_debug.xml | 2 | ||||
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/ChunkCache.java | 1 | ||||
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/FilerClient.java | 15 | ||||
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 25 | ||||
| -rw-r--r-- | other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java | 36 | ||||
| -rw-r--r-- | other/java/client/src/main/proto/filer.proto | 20 |
8 files changed, 95 insertions, 13 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index efbf304c4..4bfc5ab8f 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.5.2</version> + <version>1.5.6</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -68,6 +68,11 @@ <version>4.13.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>javax.annotation-api</artifactId> + <version>1.3.2</version> + </dependency> </dependencies> <distributionManagement> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 9efc21373..c3c960a28 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.5.2</version> + <version>1.5.6</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 04ff52730..acdf621a5 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.5.2</version> + <version>1.5.6</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index 7afa2dca0..58870d742 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -15,7 +15,6 @@ public class ChunkCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) - .weakValues() .expireAfterAccess(1, TimeUnit.HOURS) .build(); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 035b2c852..7338d5bee 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -275,9 +275,9 @@ public class FilerClient { try { FilerProto.CreateEntryResponse createEntryResponse = filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); if (Strings.isNullOrEmpty(createEntryResponse.getError())) { return true; } @@ -333,4 +333,13 @@ public class FilerClient { return true; } + public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) { + return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() + .setPathPrefix(prefix) + .setClientName(clientName) + .setSinceNs(sinceNs) + .build() + ); + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index ab2407dec..2b530d2dd 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -19,6 +19,7 @@ public class SeaweedRead { private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); static ChunkCache chunkCache = new ChunkCache(4); + static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, @@ -27,16 +28,28 @@ public class SeaweedRead { List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); + Map<String, FilerProto.Locations> knownLocations = new HashMap<>(); + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); for (ChunkView chunkView : chunkViews) { String vid = parseVolumeId(chunkView.fileId); - lookupRequest.addVolumeIds(vid); + FilerProto.Locations locations = volumeIdCache.getLocations(vid); + if (locations == null) { + lookupRequest.addVolumeIds(vid); + } else { + knownLocations.put(vid, locations); + } } - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); - - Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); + if (lookupRequest.getVolumeIdsCount() > 0) { + FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); + for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) { + volumeIdCache.setLocations(entry.getKey(), entry.getValue()); + knownLocations.put(entry.getKey(), entry.getValue()); + } + } //TODO parallel this long readCount = 0; @@ -50,7 +63,7 @@ public class SeaweedRead { startOffset += gap; } - FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); + FilerProto.Locations locations = knownLocations.get(parseVolumeId(chunkView.fileId)); if (locations == null || locations.getLocationsCount() == 0) { LOG.error("failed to locate {}", chunkView.fileId); // log here! diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java new file mode 100644 index 000000000..fd2649cc2 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java @@ -0,0 +1,36 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class VolumeIdCache { + + private Cache<String, FilerProto.Locations> cache = null; + + public VolumeIdCache(int maxEntries) { + if (maxEntries == 0) { + return; + } + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(5, TimeUnit.MINUTES) + .build(); + } + + public FilerProto.Locations getLocations(String volumeId) { + if (this.cache == null) { + return null; + } + return this.cache.getIfPresent(volumeId); + } + + public void setLocations(String volumeId, FilerProto.Locations locations) { + if (this.cache == null) { + return; + } + this.cache.put(volumeId, locations); + } + +} diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 11c29e6ec..f61b62fa3 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -348,3 +348,23 @@ message KvPutRequest { message KvPutResponse { string error = 1; } + +// path-based configurations +message FilerConf { + int32 version = 1; + message PathConf { + string location_prefix = 1; + string collection = 2; + string replication = 3; + string ttl = 4; + enum DiskType { + NONE = 0; + HDD = 1; + SSD = 2; + } + DiskType disk_type = 5; + bool fsync = 6; + uint32 volume_growth_count = 7; + } + repeated PathConf locations = 2; +} |
