aboutsummaryrefslogtreecommitdiff
path: root/other/java/client/src
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-07-20 03:29:17 -0700
committerChris Lu <chris.lu@gmail.com>2020-07-20 03:29:17 -0700
commit1d724ab23755704f2e5058f9c81e1de134fb5fb0 (patch)
tree84d68903b0f8c828c30a76d7e2111131c14e3edd /other/java/client/src
parent60d14a9800d8513ae0849a764d66ce80795dadbe (diff)
downloadseaweedfs-1d724ab23755704f2e5058f9c81e1de134fb5fb0.tar.xz
seaweedfs-1d724ab23755704f2e5058f9c81e1de134fb5fb0.zip
hdfs: support read write chunk manifest
Diffstat (limited to 'other/java/client/src')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java134
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java9
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java12
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java42
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java5
5 files changed, 175 insertions, 27 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
new file mode 100644
index 000000000..3cac46db9
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -0,0 +1,134 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FileChunkManifest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class);
+
+ private static final int mergeFactor = 3;
+
+ public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) {
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (chunk.getIsChunkManifest()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<FilerProto.FileChunk> resolveChunkManifest(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException {
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ continue;
+ }
+
+ // IsChunkManifest
+ LOG.debug("fetching chunk manifest:{}", chunk);
+ byte[] data = fetchChunk(filerGrpcClient, chunk);
+ FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
+ List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk t : m.getChunksList()) {
+ // avoid deprecated chunk.getFileId()
+ resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
+ }
+ dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks));
+ }
+
+ return dataChunks;
+ }
+
+ private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
+
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ String vid = "" + chunk.getFid().getVolumeId();
+ lookupRequest.addVolumeIds(vid);
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ FilerProto.Locations locations = vid2Locations.get(vid);
+
+ SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
+ FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
+ 0,
+ -1,
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed());
+
+ byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
+ if (chunkData == null) {
+ LOG.debug("doFetchFullChunkData:{}", chunkView);
+ chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
+ }
+ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
+ SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+
+ return chunkData;
+
+ }
+
+ public static List<FilerProto.FileChunk> maybeManifestize(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks) throws IOException {
+ // the return variable
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : inputChunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ } else {
+ chunks.add(chunk);
+ }
+ }
+
+ int remaining = dataChunks.size();
+ for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
+ FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor));
+ chunks.add(chunk);
+ remaining -= mergeFactor;
+ }
+
+ // remaining
+ for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) {
+ chunks.add(dataChunks.get(i));
+ }
+ return chunks;
+ }
+
+ private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks) throws IOException {
+ // create and serialize the manifest
+ FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
+ byte[] data = m.build().toByteArray();
+
+ long minOffset = Long.MAX_VALUE;
+ long maxOffset = -1;
+ for (FilerProto.FileChunk chunk : dataChunks) {
+ minOffset = Math.min(minOffset, chunk.getOffset());
+ maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset());
+ }
+
+ FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
+ filerGrpcClient.getReplication(),
+ filerGrpcClient,
+ minOffset,
+ data, 0, data.length);
+ manifestChunk.setIsChunkManifest(true);
+ manifestChunk.setSize(maxOffset - minOffset);
+ return manifestChunk.build();
+
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 2103fc699..0d087d5b4 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -24,6 +24,10 @@ public class FilerClient {
this.filerGrpcClient = filerGrpcClient;
}
+ public static String toFileId(FilerProto.FileId fid) {
+ return String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ }
+
public boolean mkdirs(String path, int mode) {
String currentUser = System.getProperty("user.name");
return mkdirs(path, mode, 0, 0, currentUser, new String[]{});
@@ -209,7 +213,6 @@ public class FilerClient {
}
}
-
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
@@ -279,9 +282,7 @@ public class FilerClient {
entryBuilder.clearChunks();
for (FilerProto.FileChunk chunk : entry.getChunksList()) {
FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
- FilerProto.FileId fid = chunk.getFid();
- fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
- chunkBuilder.setFileId(fileId);
+ chunkBuilder.setFileId(toFileId(chunk.getFid()));
entryBuilder.addChunks(chunkBuilder);
}
return entryBuilder.build();
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 301919919..fbc3296ae 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -2,16 +2,12 @@ package seaweedfs.client;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.util.*;
@@ -77,7 +73,7 @@ public class SeaweedRead {
return len;
}
- private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@@ -138,7 +134,11 @@ public class SeaweedRead {
return views;
}
- public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
+ public static List<VisibleInterval> nonOverlappingVisibleIntervals(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException {
+
+ chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList);
+
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index e9819668c..fd54453a1 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,8 +1,6 @@
package seaweedfs.client;
import com.google.protobuf.ByteString;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
@@ -10,10 +8,10 @@ import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
+import java.util.List;
public class SeaweedWrite {
@@ -25,6 +23,17 @@ public class SeaweedWrite {
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
+ synchronized (entry) {
+ entry.addChunks(writeChunk(replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength));
+ }
+ }
+
+ public static FilerProto.FileChunk.Builder writeChunk(final String replication,
+ final FilerGrpcClient filerGrpcClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerGrpcClient.getCollection())
@@ -46,25 +55,28 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- synchronized (entry) {
- entry.addChunks(FilerProto.FileChunk.newBuilder()
- .setFileId(fileId)
- .setOffset(offset)
- .setSize(bytesLength)
- .setMtime(System.currentTimeMillis() / 10000L)
- .setETag(etag)
- .setCipherKey(cipherKeyString)
- );
- }
-
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);
+ return FilerProto.FileChunk.newBuilder()
+ .setFileId(fileId)
+ .setOffset(offset)
+ .setSize(bytesLength)
+ .setMtime(System.currentTimeMillis() / 10000L)
+ .setETag(etag)
+ .setCipherKey(cipherKeyString);
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String parentDirectory, final FilerProto.Entry.Builder entry) {
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
+
+ int chunkSize = entry.getChunksCount();
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList());
+
synchronized (entry) {
+ entry.clearChunks();
+ entry.addAllChunks(chunks);
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
index ccfcdb117..44b833c90 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -3,13 +3,14 @@ package seaweedfs.client;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SeaweedReadTest {
@Test
- public void testNonOverlappingVisibleIntervals() {
+ public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
@@ -24,7 +25,7 @@ public class SeaweedReadTest {
.setMtime(2000)
.build());
- List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}