aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-04 22:21:55 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-04 22:21:55 -0800
commit694df8933139a1b498eefb5f2f501e2d5912d58c (patch)
tree7c0b6fb949d4af5fd35d1cfc6a1283b03e9af1fb
parent8c3177d835bb86eed6127b390e2f39ca63ba1a04 (diff)
downloadseaweedfs-694df8933139a1b498eefb5f2f501e2d5912d58c.tar.xz
seaweedfs-694df8933139a1b498eefb5f2f501e2d5912d58c.zip
java: add configurable volume access mode
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java2
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java32
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java16
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java10
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java1
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java10
6 files changed, 60 insertions, 11 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
index 3293db2ca..3d7da91d5 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -74,7 +74,7 @@ public class FileChunkManifest {
byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) {
LOG.debug("doFetchFullChunkData:{}", chunkView);
- chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
+ chunkData = SeaweedRead.doFetchFullChunkData(filerGrpcClient, chunkView, locations);
}
if (chunk.getIsChunkManifest()){
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 1a719f3c0..8a37827f1 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -15,6 +15,10 @@ import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
+ public final int VOLUME_SERVER_ACCESS_DIRECT = 0;
+ public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1;
+ public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
+
private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class);
static SslContext sslContext;
@@ -34,6 +38,8 @@ public class FilerGrpcClient {
private boolean cipher = false;
private String collection = "";
private String replication = "";
+ private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ private String filerAddress;
public FilerGrpcClient(String host, int grpcPort) {
this(host, grpcPort, sslContext);
@@ -49,6 +55,8 @@ public class FilerGrpcClient {
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
+ filerAddress = String.format("%s:%d", host, grpcPort-10000);
+
FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
this.getBlockingStub().getFilerConfiguration(
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
@@ -58,7 +66,7 @@ public class FilerGrpcClient {
}
- public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
+ private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
asyncStub = SeaweedFilerGrpc.newStub(channel);
@@ -93,4 +101,26 @@ public class FilerGrpcClient {
return futureStub;
}
+ public void setAccessVolumeServerDirectly() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ }
+ public boolean isAccessVolumeServerDirectly() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT;
+ }
+ public void setAccessVolumeServerByPublicUrl() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+ public boolean isAccessVolumeServerByPublicUrl() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+ public void setAccessVolumeServerByFilerProxy() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+ public boolean isAccessVolumeServerByFilerProxy() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+ public String getFilerAddress() {
+ return this.filerAddress;
+ }
+
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index c45987bed..3df832d7d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -71,7 +71,7 @@ public class SeaweedRead {
return 0;
}
- int len = readChunkView(startOffset, buf, chunkView, locations);
+ int len = readChunkView(filerGrpcClient, startOffset, buf, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
@@ -93,12 +93,12 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(FilerGrpcClient filerGrpcClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) {
- chunkData = doFetchFullChunkData(chunkView, locations);
+ chunkData = doFetchFullChunkData(filerGrpcClient, chunkView, locations);
chunkCache.setChunk(chunkView.fileId, chunkData);
}
@@ -110,12 +110,18 @@ public class SeaweedRead {
return len;
}
- public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(FilerGrpcClient filerGrpcClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] data = null;
IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
for (FilerProto.Location location : locations.getLocationsList()) {
+ String host = location.getUrl();
+ if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) {
+ host = location.getPublicUrl();
+ } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) {
+ host = filerGrpcClient.getFilerAddress();
+ }
String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
try {
data = doFetchOneFullChunkData(chunkView, url);
@@ -145,7 +151,7 @@ public class SeaweedRead {
}
- public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
+ private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
HttpGet request = new HttpGet(url);
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index b8fd3e299..3cc11e21c 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -51,9 +51,15 @@ public class SeaweedWrite {
.setPath(path)
.build());
String fileId = response.getFileId();
- String url = response.getUrl();
String auth = response.getAuth();
- String targetUrl = String.format("http://%s/%s", url, fileId);
+
+ String host = response.getUrl();
+ if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) {
+ host = response.getPublicUrl();
+ } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) {
+ host = filerGrpcClient.getFilerAddress();
+ }
+ String targetUrl = String.format("http://%s/%s", host, fileId);
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null;
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 84f11e846..6072d3ec8 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -26,6 +26,7 @@ public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
+ public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volumeServerAccess";
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 223036c13..8147f3efe 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -18,8 +18,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE;
-import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE;
+import static seaweed.hdfs.SeaweedFileSystem.*;
public class SeaweedFileSystemStore {
@@ -34,6 +33,13 @@ public class SeaweedFileSystemStore {
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
filerClient = new FilerClient(filerGrpcClient);
this.conf = conf;
+ String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
+ if (volumeServerAccessMode.equals("publicUrl")) {
+ filerGrpcClient.setAccessVolumeServerByPublicUrl();
+ } else if (volumeServerAccessMode.equals("filerProxy")) {
+ filerGrpcClient.setAccessVolumeServerByFilerProxy();
+ }
+
}
public void close() {