aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-07-21 14:08:18 +0800
committerGitHub <noreply@github.com>2020-07-21 14:08:18 +0800
commit6ea4ce722704171fdbddba61f423202a620b6ecf (patch)
treeb4e3e1deb1133cc7d34757c0981e30dabe196a03
parent5850bb733936399babbe2d77f4b27cac312e2798 (diff)
parent885c624bceb61688c806b91350e70d75088c6eea (diff)
downloadseaweedfs-6ea4ce722704171fdbddba61f423202a620b6ecf.tar.xz
seaweedfs-6ea4ce722704171fdbddba61f423202a620b6ecf.zip
Merge pull request #3 from chrislusf/master
sync
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java23
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java135
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java86
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java17
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java9
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java42
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java5
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java7
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java8
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java7
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java8
-rw-r--r--weed/filer2/filechunk_manifest.go142
-rw-r--r--weed/filer2/filechunk_manifest_test.go113
-rw-r--r--weed/filer2/filechunks.go32
-rw-r--r--weed/filer2/filechunks_test.go10
-rw-r--r--weed/filer2/reader_at.go17
-rw-r--r--weed/filer2/stream.go30
-rw-r--r--weed/filesys/dirty_page.go50
-rw-r--r--weed/filesys/file.go2
-rw-r--r--weed/filesys/filehandle.go13
-rw-r--r--weed/filesys/meta_cache/meta_cache.go23
-rw-r--r--weed/filesys/wfs_write.go66
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go2
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go2
-rw-r--r--weed/replication/sink/filersink/filer_sink.go26
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go2
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go2
-rw-r--r--weed/server/filer_grpc_server.go73
-rw-r--r--weed/server/filer_server_handlers_write.go5
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go28
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go2
-rw-r--r--weed/server/webdav_server.go2
-rw-r--r--weed/shell/command_volume_fsck.go8
-rw-r--r--weed/storage/needle/volume_ttl.go10
36 files changed, 822 insertions, 191 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
index 897fe9694..55f003a18 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -1,22 +1,39 @@
package seaweedfs.client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ByteBufferPool {
- static List<ByteBuffer> bufferList = new ArrayList<>();
+ private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024;
+ private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
+
+ private static final List<ByteBuffer> bufferList = new ArrayList<>();
public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ bufferSize = MIN_BUFFER_SIZE;
+ }
if (bufferList.isEmpty()) {
return ByteBuffer.allocate(bufferSize);
}
- return bufferList.remove(bufferList.size()-1);
+ ByteBuffer buffer = bufferList.remove(bufferList.size() - 1);
+ if (buffer.capacity() >= bufferSize) {
+ return buffer;
+ }
+
+ LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize);
+ bufferList.add(0, buffer);
+ return ByteBuffer.allocate(bufferSize);
+
}
public static synchronized void release(ByteBuffer obj) {
- bufferList.add(obj);
+ bufferList.add(0, obj);
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
new file mode 100644
index 000000000..d8d29ede8
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -0,0 +1,135 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FileChunkManifest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class);
+
+ private static final int mergeFactor = 1000;
+
+ public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) {
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (chunk.getIsChunkManifest()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<FilerProto.FileChunk> resolveChunkManifest(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException {
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ continue;
+ }
+
+ // IsChunkManifest
+ LOG.debug("fetching chunk manifest:{}", chunk);
+ byte[] data = fetchChunk(filerGrpcClient, chunk);
+ FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
+ List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk t : m.getChunksList()) {
+ // avoid deprecated chunk.getFileId()
+ resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
+ }
+ dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks));
+ }
+
+ return dataChunks;
+ }
+
+ private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
+
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ String vid = "" + chunk.getFid().getVolumeId();
+ lookupRequest.addVolumeIds(vid);
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ FilerProto.Locations locations = vid2Locations.get(vid);
+
+ SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
+ FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
+ 0,
+ -1,
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed());
+
+ byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
+ if (chunkData == null) {
+ LOG.debug("doFetchFullChunkData:{}", chunkView);
+ chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
+ }
+ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
+ SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+
+ return chunkData;
+
+ }
+
+ public static List<FilerProto.FileChunk> maybeManifestize(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks) throws IOException {
+ // the return variable
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : inputChunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ } else {
+ chunks.add(chunk);
+ }
+ }
+
+ int remaining = dataChunks.size();
+ for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
+ FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor));
+ chunks.add(chunk);
+ remaining -= mergeFactor;
+ }
+
+ // remaining
+ for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) {
+ chunks.add(dataChunks.get(i));
+ }
+ return chunks;
+ }
+
+ private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks) throws IOException {
+ // create and serialize the manifest
+ dataChunks = FilerClient.beforeEntrySerialization(dataChunks);
+ FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
+ byte[] data = m.build().toByteArray();
+
+ long minOffset = Long.MAX_VALUE;
+ long maxOffset = -1;
+ for (FilerProto.FileChunk chunk : dataChunks) {
+ minOffset = Math.min(minOffset, chunk.getOffset());
+ maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset());
+ }
+
+ FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
+ filerGrpcClient.getReplication(),
+ filerGrpcClient,
+ minOffset,
+ data, 0, data.length);
+ manifestChunk.setIsChunkManifest(true);
+ manifestChunk.setSize(maxOffset - minOffset);
+ return manifestChunk.build();
+
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 2103fc699..468a95e28 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -24,6 +24,67 @@ public class FilerClient {
this.filerGrpcClient = filerGrpcClient;
}
+ public static String toFileId(FilerProto.FileId fid) {
+ if (fid == null) {
+ return null;
+ }
+ return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ }
+
+ public static FilerProto.FileId toFileIdObject(String fileIdStr) {
+ if (fileIdStr == null || fileIdStr.length() == 0) {
+ return null;
+ }
+ int commaIndex = fileIdStr.lastIndexOf(',');
+ String volumeIdStr = fileIdStr.substring(0, commaIndex);
+ String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8);
+ String cookieStr = fileIdStr.substring(fileIdStr.length() - 8);
+
+ return FilerProto.FileId.newBuilder()
+ .setVolumeId(Integer.parseInt(volumeIdStr))
+ .setFileKey(Long.parseLong(fileKeyStr, 16))
+ .setCookie((int) Long.parseLong(cookieStr, 16))
+ .build();
+ }
+
+ public static List<FilerProto.FileChunk> beforeEntrySerialization(List<FilerProto.FileChunk> chunks) {
+ List<FilerProto.FileChunk> cleanedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : chunks) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.clearFileId();
+ chunkBuilder.clearSourceFileId();
+ chunkBuilder.setFid(toFileIdObject(chunk.getFileId()));
+ FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId());
+ if (sourceFid != null) {
+ chunkBuilder.setSourceFid(sourceFid);
+ }
+ cleanedChunks.add(chunkBuilder.build());
+ }
+ return cleanedChunks;
+ }
+
+ public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) {
+ if (entry.getChunksList().size() <= 0) {
+ return entry;
+ }
+ String fileId = entry.getChunks(0).getFileId();
+ if (fileId != null && fileId.length() != 0) {
+ return entry;
+ }
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ entryBuilder.clearChunks();
+ for (FilerProto.FileChunk chunk : entry.getChunksList()) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.setFileId(toFileId(chunk.getFid()));
+ String sourceFileId = toFileId(chunk.getSourceFid());
+ if (sourceFileId != null) {
+ chunkBuilder.setSourceFileId(sourceFileId);
+ }
+ entryBuilder.addChunks(chunkBuilder);
+ }
+ return entryBuilder.build();
+ }
+
public boolean mkdirs(String path, int mode) {
String currentUser = System.getProperty("user.name");
return mkdirs(path, mode, 0, 0, currentUser, new String[]{});
@@ -184,7 +245,7 @@ public class FilerClient {
List<FilerProto.Entry> entries = new ArrayList<>();
while (iter.hasNext()) {
FilerProto.ListEntriesResponse resp = iter.next();
- entries.add(fixEntryAfterReading(resp.getEntry()));
+ entries.add(afterEntryDeserialization(resp.getEntry()));
}
return entries;
}
@@ -199,7 +260,7 @@ public class FilerClient {
if (entry == null) {
return null;
}
- return fixEntryAfterReading(entry);
+ return afterEntryDeserialization(entry);
} catch (Exception e) {
if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
return null;
@@ -209,7 +270,6 @@ public class FilerClient {
}
}
-
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
@@ -267,24 +327,4 @@ public class FilerClient {
return true;
}
- private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) {
- if (entry.getChunksList().size() <= 0) {
- return entry;
- }
- String fileId = entry.getChunks(0).getFileId();
- if (fileId != null && fileId.length() != 0) {
- return entry;
- }
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- entryBuilder.clearChunks();
- for (FilerProto.FileChunk chunk : entry.getChunksList()) {
- FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
- FilerProto.FileId fid = chunk.getFid();
- fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
- chunkBuilder.setFileId(fileId);
- entryBuilder.addChunks(chunkBuilder);
- }
- return entryBuilder.build();
- }
-
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 301919919..f0490540d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -2,16 +2,12 @@ package seaweedfs.client;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.util.*;
@@ -19,7 +15,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
- static ChunkCache chunkCache = new ChunkCache(16);
+ static ChunkCache chunkCache = new ChunkCache(0);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@@ -44,7 +40,8 @@ public class SeaweedRead {
int startOffset = bufferOffset;
for (ChunkView chunkView : chunkViews) {
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
- if (locations.getLocationsCount() == 0) {
+ if (locations == null || locations.getLocationsCount() == 0) {
+ LOG.error("failed to locate {}", chunkView.fileId);
// log here!
return 0;
}
@@ -77,7 +74,7 @@ public class SeaweedRead {
return len;
}
- private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@@ -138,7 +135,11 @@ public class SeaweedRead {
return views;
}
- public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
+ public static List<VisibleInterval> nonOverlappingVisibleIntervals(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException {
+
+ chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList);
+
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
index e2835b718..c465d935f 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -9,19 +9,22 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class SeaweedUtil {
static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+ static CloseableHttpClient httpClient;
static {
// Increase max total connection to 200
cm.setMaxTotal(200);
// Increase default max connection per route to 20
cm.setDefaultMaxPerRoute(20);
- }
- public static CloseableHttpClient getClosableHttpClient() {
- return HttpClientBuilder.create()
+ httpClient = HttpClientBuilder.create()
.setConnectionManager(cm)
.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
}
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return httpClient;
+ }
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index e9819668c..fd54453a1 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,8 +1,6 @@
package seaweedfs.client;
import com.google.protobuf.ByteString;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
@@ -10,10 +8,10 @@ import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
+import java.util.List;
public class SeaweedWrite {
@@ -25,6 +23,17 @@ public class SeaweedWrite {
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
+ synchronized (entry) {
+ entry.addChunks(writeChunk(replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength));
+ }
+ }
+
+ public static FilerProto.FileChunk.Builder writeChunk(final String replication,
+ final FilerGrpcClient filerGrpcClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerGrpcClient.getCollection())
@@ -46,25 +55,28 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- synchronized (entry) {
- entry.addChunks(FilerProto.FileChunk.newBuilder()
- .setFileId(fileId)
- .setOffset(offset)
- .setSize(bytesLength)
- .setMtime(System.currentTimeMillis() / 10000L)
- .setETag(etag)
- .setCipherKey(cipherKeyString)
- );
- }
-
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);
+ return FilerProto.FileChunk.newBuilder()
+ .setFileId(fileId)
+ .setOffset(offset)
+ .setSize(bytesLength)
+ .setMtime(System.currentTimeMillis() / 10000L)
+ .setETag(etag)
+ .setCipherKey(cipherKeyString);
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String parentDirectory, final FilerProto.Entry.Builder entry) {
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
+
+ int chunkSize = entry.getChunksCount();
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList());
+
synchronized (entry) {
+ entry.clearChunks();
+ entry.addAllChunks(chunks);
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
index ccfcdb117..44b833c90 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -3,13 +3,14 @@ package seaweedfs.client;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SeaweedReadTest {
@Test
- public void testNonOverlappingVisibleIntervals() {
+ public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
@@ -24,7 +25,7 @@ public class SeaweedReadTest {
.setMtime(2000)
.build());
- List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 85490c181..2341d335d 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index c26ad728f..6b3c72f7d 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
- final int bufferSize) {
+ final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
@@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream {
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream {
}
}
- return (int)bytesRead;
+ return (int) bytesRead;
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 46de0c443..d62d74fb1 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -56,12 +56,12 @@ public class SeaweedOutputStream extends OutputStream {
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
- 10L,
+ 120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream {
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;
@@ -180,7 +180,7 @@ public class SeaweedOutputStream extends OutputStream {
bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 85490c181..2341d335d 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index c26ad728f..6b3c72f7d 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
- final int bufferSize) {
+ final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
@@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream {
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream {
}
}
- return (int)bytesRead;
+ return (int) bytesRead;
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index c602a0d81..05805b9e5 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -60,12 +60,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
- 10L,
+ 120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@@ -113,7 +113,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;
@@ -227,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {
diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go
new file mode 100644
index 000000000..62d2c6e7f
--- /dev/null
+++ b/weed/filer2/filechunk_manifest.go
@@ -0,0 +1,142 @@
+package filer2
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ ManifestBatch = 1000
+)
+
+func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
+ for _, chunk := range chunks {
+ if chunk.IsChunkManifest {
+ return true
+ }
+ }
+ return false
+}
+
+func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) {
+ // TODO maybe parallel this
+ for _, chunk := range chunks {
+ if !chunk.IsChunkManifest {
+ dataChunks = append(dataChunks, chunk)
+ continue
+ }
+
+ // IsChunkManifest
+ data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed)
+ if err != nil {
+ return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err)
+ }
+ m := &filer_pb.FileChunkManifest{}
+ if err := proto.Unmarshal(data, m); err != nil {
+ return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err)
+ }
+ manifestChunks = append(manifestChunks, chunk)
+ // recursive
+ filer_pb.AfterEntryDeserialization(m.Chunks)
+ dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks)
+ if subErr != nil {
+ return chunks, nil, subErr
+ }
+ dataChunks = append(dataChunks, dchunks...)
+ manifestChunks = append(manifestChunks, mchunks...)
+ }
+ return
+}
+
+// TODO fetch from cache for weed mount?
+func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+ urlString, err := lookupFileIdFn(fileId)
+ if err != nil {
+ glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+ var buffer bytes.Buffer
+ err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(0).Infof("read %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+
+ return buffer.Bytes(), nil
+}
+
+func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
+ return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
+}
+
+func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
+
+ var dataChunks []*filer_pb.FileChunk
+ for _, chunk := range inputChunks {
+ if !chunk.IsChunkManifest {
+ dataChunks = append(dataChunks, chunk)
+ } else {
+ chunks = append(chunks, chunk)
+ }
+ }
+
+ remaining := len(dataChunks)
+ for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
+ chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
+ if err != nil {
+ return dataChunks, err
+ }
+ chunks = append(chunks, chunk)
+ remaining -= mergeFactor
+ }
+ // remaining
+ for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
+ chunks = append(chunks, dataChunks[i])
+ }
+ return
+}
+
+func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
+
+ filer_pb.BeforeEntrySerialization(dataChunks)
+
+ // create and serialize the manifest
+ data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
+ Chunks: dataChunks,
+ })
+ if serErr != nil {
+ return nil, fmt.Errorf("serializing manifest: %v", serErr)
+ }
+
+ minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
+ for _, chunk := range dataChunks {
+ if minOffset > int64(chunk.Offset) {
+ minOffset = chunk.Offset
+ }
+ if maxOffset < int64(chunk.Size)+chunk.Offset {
+ maxOffset = int64(chunk.Size) + chunk.Offset
+ }
+ }
+
+ manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
+ if err != nil {
+ return nil, err
+ }
+ manifestChunk.IsChunkManifest = true
+ manifestChunk.Offset = minOffset
+ manifestChunk.Size = uint64(maxOffset - minOffset)
+
+ return
+}
+
+type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)
diff --git a/weed/filer2/filechunk_manifest_test.go b/weed/filer2/filechunk_manifest_test.go
new file mode 100644
index 000000000..2b0862d07
--- /dev/null
+++ b/weed/filer2/filechunk_manifest_test.go
@@ -0,0 +1,113 @@
+package filer2
+
+import (
+ "bytes"
+ "math"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestDoMaybeManifestize(t *testing.T) {
+ var manifestTests = []struct {
+ inputs []*filer_pb.FileChunk
+ expected []*filer_pb.FileChunk
+ }{
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: false},
+ {FileId: "2", IsChunkManifest: false},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "12", IsChunkManifest: true},
+ {FileId: "34", IsChunkManifest: true},
+ },
+ },
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "2", IsChunkManifest: false},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "23", IsChunkManifest: true},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ },
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: false},
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "13", IsChunkManifest: true},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ },
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "34", IsChunkManifest: true},
+ },
+ },
+ }
+
+ for i, mtest := range manifestTests {
+ println("test", i)
+ actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge)
+ assertEqualChunks(t, mtest.expected, actual)
+ }
+
+}
+
+func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) {
+ assert.Equal(t, len(expected), len(actual))
+ for i := 0; i < len(actual); i++ {
+ assertEqualChunk(t, actual[i], expected[i])
+ }
+}
+func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) {
+ assert.Equal(t, expected.FileId, actual.FileId)
+ assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest)
+}
+
+func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
+
+ var buf bytes.Buffer
+ minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
+ for k := 0; k < len(dataChunks); k++ {
+ chunk := dataChunks[k]
+ buf.WriteString(chunk.FileId)
+ if minOffset > int64(chunk.Offset) {
+ minOffset = chunk.Offset
+ }
+ if maxOffset < int64(chunk.Size)+chunk.Offset {
+ maxOffset = int64(chunk.Size) + chunk.Offset
+ }
+ }
+
+ manifestChunk = &filer_pb.FileChunk{
+ FileId: buf.String(),
+ }
+ manifestChunk.IsChunkManifest = true
+ manifestChunk.Offset = minOffset
+ manifestChunk.Size = uint64(maxOffset - minOffset)
+
+ return
+}
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index 6832d0f31..ea7772b4a 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -46,9 +46,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
return fmt.Sprintf("%x", h.Sum32())
}
-func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
+func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
- visibles := NonOverlappingVisibleIntervals(chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
fileIds := make(map[string]bool)
for _, interval := range visibles {
@@ -65,7 +65,23 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
return
}
-func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
+func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
+
+ aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
+ if aErr != nil {
+ return nil, aErr
+ }
+ bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs)
+ if bErr != nil {
+ return nil, bErr
+ }
+
+ delta = append(delta, DoMinusChunks(aData, bData)...)
+ delta = append(delta, DoMinusChunks(aMeta, bMeta)...)
+ return
+}
+
+func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
fileIds := make(map[string]bool)
for _, interval := range bs {
@@ -94,9 +110,9 @@ func (cv *ChunkView) IsFullChunk() bool {
return cv.Size == cv.ChunkSize
}
-func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
+func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
- visibles := NonOverlappingVisibleIntervals(chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
return ViewFromVisibleIntervals(visibles, offset, size)
@@ -190,7 +206,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
return newVisibles
}
-func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
+// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
+// If the file chunk content is a chunk manifest
+func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
+
+ chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].Mtime < chunks[j].Mtime
diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go
index 7b1133b85..bfee59198 100644
--- a/weed/filer2/filechunks_test.go
+++ b/weed/filer2/filechunks_test.go
@@ -16,7 +16,7 @@ func TestCompactFileChunks(t *testing.T) {
{Offset: 110, Size: 200, FileId: "jkl", Mtime: 300},
}
- compacted, garbage := CompactFileChunks(chunks)
+ compacted, garbage := CompactFileChunks(nil, chunks)
if len(compacted) != 3 {
t.Fatalf("unexpected compacted: %d", len(compacted))
@@ -49,7 +49,7 @@ func TestCompactFileChunks2(t *testing.T) {
})
}
- compacted, garbage := CompactFileChunks(chunks)
+ compacted, garbage := CompactFileChunks(nil, chunks)
if len(compacted) != 4 {
t.Fatalf("unexpected compacted: %d", len(compacted))
@@ -186,7 +186,7 @@ func TestIntervalMerging(t *testing.T) {
for i, testcase := range testcases {
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
- intervals := NonOverlappingVisibleIntervals(testcase.Chunks)
+ intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks)
for x, interval := range intervals {
log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
i, x, interval.start, interval.stop, interval.fileId)
@@ -371,7 +371,7 @@ func TestChunksReading(t *testing.T) {
for i, testcase := range testcases {
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
- chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size)
+ chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
for x, chunk := range chunks {
log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
i, x, chunk.Offset, chunk.Size, chunk.FileId)
@@ -415,6 +415,6 @@ func BenchmarkCompactFileChunks(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- CompactFileChunks(chunks)
+ CompactFileChunks(nil, chunks)
}
}
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
index 11a80443f..568d94267 100644
--- a/weed/filer2/reader_at.go
+++ b/weed/filer2/reader_at.go
@@ -1,7 +1,6 @@
package filer2
import (
- "bytes"
"context"
"fmt"
"io"
@@ -9,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -144,19 +142,6 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
- urlString, err := c.lookupFileId(fileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
- return nil, err
- }
- var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
- buffer.Write(data)
- })
- if err != nil {
- glog.V(0).Infof("read %s failed, err: %v", fileId, err)
- return nil, err
- }
+ return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped)
- return buffer.Bytes(), nil
}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index 033a8dd13..c7df007ec 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -2,6 +2,7 @@ package filer2
import (
"bytes"
+ "fmt"
"io"
"math"
"strings"
@@ -14,7 +15,8 @@ import (
func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- chunkViews := ViewFromChunks(chunks, offset, size)
+ fmt.Printf("start to stream content for chunks: %+v\n", chunks)
+ chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
fileId2Url := make(map[string]string)
@@ -50,14 +52,14 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
buffer := bytes.Buffer{}
- chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
-
- lookupFileId := func(fileId string) (targetUrl string, err error) {
+ lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
return masterClient.LookupFileId(fileId)
}
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+
for _, chunkView := range chunkViews {
- urlString, err := lookupFileId(chunkView.FileId)
+ urlString, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return nil, err
@@ -88,23 +90,27 @@ var _ = io.ReadSeeker(&ChunkStreamReader{})
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
+ lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ return masterClient.LookupFileId(fileId)
+ }
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
return &ChunkStreamReader{
- chunkViews: chunkViews,
- lookupFileId: func(fileId string) (targetUrl string, err error) {
- return masterClient.LookupFileId(fileId)
- },
+ chunkViews: chunkViews,
+ lookupFileId: lookupFileIdFn,
}
}
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
+ lookupFileIdFn := LookupFn(filerClient)
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
return &ChunkStreamReader{
chunkViews: chunkViews,
- lookupFileId: LookupFn(filerClient),
+ lookupFileId: lookupFileIdFn,
}
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 45224b3e7..46d20e466 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -2,16 +2,12 @@ package filesys
import (
"bytes"
- "context"
- "fmt"
"io"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
@@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
- var fileId, host string
- var auth security.EncodedJwt
-
dir, _ := pages.f.fullpath().DirAndName()
- if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: pages.f.wfs.option.Replication,
- Collection: pages.f.wfs.option.Collection,
- TtlSec: pages.f.wfs.option.TtlSec,
- DataCenter: pages.f.wfs.option.DataCenter,
- ParentPath: dir,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- host = pages.f.wfs.AdjustedUrl(host)
- pages.collection, pages.replication = resp.Collection, resp.Replication
-
- return nil
- }); err != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
+ return nil, err
}
- pages.f.wfs.chunkCache.SetChunk(fileId, data)
+ pages.collection, pages.replication = collection, replication
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return chunk, nil
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 4a6bc9a8a..dcda93522 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -253,7 +253,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
+ file.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(file.wfs), file.entry.Chunks)
file.reader = nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 9b9df916c..31fd08f97 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -88,8 +88,12 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
return 0, nil
}
+ var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ if chunkResolveErr != nil {
+ return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
+ }
fh.f.reader = nil
}
@@ -206,7 +210,12 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
+ chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
+ }
fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index fdb486ba4..edf329143 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -8,10 +8,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
)
+// need to have logic similar to FilerStoreWrapper
+// e.g. fill fileId field for chunks
+
type MetaCache struct {
actualStore filer2.FilerStore
sync.RWMutex
@@ -46,6 +50,7 @@ func openMetaStore(dbFolder string) filer2.FilerStore {
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error {
mc.Lock()
defer mc.Unlock()
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
return mc.actualStore.InsertEntry(ctx, entry)
}
@@ -78,13 +83,19 @@ func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPat
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error {
mc.Lock()
defer mc.Unlock()
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
return mc.actualStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) {
mc.RLock()
defer mc.RUnlock()
- return mc.actualStore.FindEntry(ctx, fp)
+ entry, err = mc.actualStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
@@ -96,7 +107,15 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) {
mc.RLock()
defer mc.RUnlock()
- return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+
+ entries, err := mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, err
}
func (mc *MetaCache) Shutdown() {
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
new file mode 100644
index 000000000..786d0b42a
--- /dev/null
+++ b/weed/filesys/wfs_write.go
@@ -0,0 +1,66 @@
+package filesys
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: wfs.option.Replication,
+ Collection: wfs.option.Collection,
+ TtlSec: wfs.option.TtlSec,
+ DataCenter: wfs.option.DataCenter,
+ ParentPath: dir,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ host = wfs.AdjustedUrl(host)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ wfs.chunkCache.SetChunk(fileId, data)
+
+ chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ return chunk, "", "", nil
+ }
+}
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index aef97c06e..fa229de22 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -96,7 +96,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
+ chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
// Create a URL that references a to-be-created blob in your
// Azure Storage account's container.
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index 1e7d82ed4..bf8632827 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -85,7 +85,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
+ chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
bucket, err := g.client.Bucket(context.Background(), g.bucket)
if err != nil {
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 50721a8f3..6429859b4 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -167,12 +167,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
glog.V(0).Infof("already replicated %s", key)
} else {
// find out what changed
- deletedChunks, newChunks := compareChunks(oldEntry, newEntry)
+ deletedChunks, newChunks, err := compareChunks(filer2.LookupFn(fs), oldEntry, newEntry)
+ if err != nil {
+ return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err)
+ }
// delete the chunks that are deleted from the source
if deleteIncludeChunks {
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
- existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks)
+ existingEntry.Chunks = filer2.DoMinusChunks(existingEntry.Chunks, deletedChunks)
}
// replicate the chunks that are new in the source
@@ -200,8 +203,21 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
})
}
-func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
- deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks)
- newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks)
+func compareChunks(lookupFileIdFn filer2.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
+ aData, aMeta, aErr := filer2.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
+ if aErr != nil {
+ return nil, nil, aErr
+ }
+ bData, bMeta, bErr := filer2.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks)
+ if bErr != nil {
+ return nil, nil, bErr
+ }
+
+ deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aData, bData)...)
+ deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aMeta, bMeta)...)
+
+ newChunks = append(newChunks, filer2.DoMinusChunks(bData, aData)...)
+ newChunks = append(newChunks, filer2.DoMinusChunks(bMeta, aMeta)...)
+
return
}
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index bb5a54272..4b58160db 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -90,7 +90,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
+ chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index d7af105b8..625cf406c 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -108,7 +108,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
+ chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
parts := make([]*s3.CompletedPart, len(chunkViews))
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 17e32731c..48e9253f0 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -137,13 +138,28 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
+func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) {
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ return "", err
+ }
+ locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
+ if !found || len(locations) == 0 {
+ return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ }
+ return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil
+}
+
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
glog.V(4).Infof("CreateEntry %v", req)
resp = &filer_pb.CreateEntryResponse{}
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry)
+ if err2 != nil {
+ return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
+ }
if req.Entry.Attributes == nil {
glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
@@ -158,7 +174,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
}, req.OExcl, req.IsFromOtherCluster)
if createErr == nil {
- fs.filer.DeleteChunks(garbages)
+ fs.filer.DeleteChunks(garbage)
} else {
glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
resp.Error = createErr.Error()
@@ -177,10 +193,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
- // remove old chunks if not included in the new ones
- unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
-
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry)
+ if err2 != nil {
+ return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
+ }
newEntry := &filer2.Entry{
FullPath: util.JoinPath(req.Directory, req.Entry.Name),
@@ -214,8 +230,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(unusedChunks)
- fs.filer.DeleteChunks(garbages)
+ fs.filer.DeleteChunks(garbage)
} else {
glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
@@ -225,6 +240,37 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err
}
+func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
+ chunks = newEntry.Chunks
+
+ // remove old chunks if not included in the new ones
+ if existingEntry != nil {
+ garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks)
+ if err != nil {
+ return chunks, nil, fmt.Errorf("MinusChunks: %v", err)
+ }
+ }
+
+ // files with manifest chunks are usually large and append only, skip calculating covered chunks
+ var coveredChunks []*filer_pb.FileChunk
+ if !filer2.HasChunkManifest(newEntry.Chunks) {
+ chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks)
+ garbage = append(garbage, coveredChunks...)
+ }
+
+ chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
+ newEntry.Attributes.Replication,
+ newEntry.Attributes.Collection,
+ "",
+ needle.SecondsToTTL(newEntry.Attributes.TtlSec),
+ false), chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
+ return
+}
+
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
glog.V(4).Infof("AppendToEntry %v", req)
@@ -254,6 +300,17 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
entry.Chunks = append(entry.Chunks, req.Chunks...)
+ entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
+ entry.Replication,
+ entry.Collection,
+ "",
+ needle.SecondsToTTL(entry.TtlSec),
+ false), entry.Chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
+
err = fs.filer.CreateEntry(context.Background(), entry, false, false)
return &filer_pb.AppendToEntryResponse{}, err
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index a642c502a..da66178ce 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -40,7 +40,7 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
@@ -67,7 +67,6 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
- writeJsonError(w, r, http.StatusInternalServerError, ae)
err = ae
return
}
@@ -114,7 +113,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" {
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 29546542c..be0438efb 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
limitedReader := io.LimitReader(partReader, int64(chunkSize))
// assign one file id for one chunk
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
if assignErr != nil {
return nil, assignErr
}
@@ -132,6 +132,12 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
}
}
+ fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
+
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if fileName != "" {
@@ -184,3 +190,23 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
return uploadResult, err
}
+
+func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer2.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
+ if assignErr != nil {
+ return nil, "", "", assignErr
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth)
+ if uploadErr != nil {
+ return nil, "", "", uploadErr
+ }
+
+ return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
+ }
+}
+
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 17f35838d..8413496b8 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -19,7 +19,7 @@ import (
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" {
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index e8bedd352..8655daf70 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -474,7 +474,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
+ f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks)
f.reader = nil
}
if f.reader == nil {
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 69a1a63b4..cf5ad6d6d 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -11,6 +11,7 @@ import (
"path/filepath"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -196,7 +197,12 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
files[i.vid].Write(buffer)
}
}, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
- for _, chunk := range entry.Entry.Chunks {
+ dChunks, mChunks, resolveErr := filer2.ResolveChunkManifest(filer2.LookupFn(c.env), entry.Entry.Chunks)
+ if resolveErr != nil {
+ return nil
+ }
+ dChunks = append(dChunks, mChunks...)
+ for _, chunk := range dChunks {
outputChan <- &Item{
vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey,
diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go
index 179057876..26ce3b8fd 100644
--- a/weed/storage/needle/volume_ttl.go
+++ b/weed/storage/needle/volume_ttl.go
@@ -1,11 +1,12 @@
package needle
import (
+ "fmt"
"strconv"
)
const (
- //stored unit types
+ // stored unit types
Empty byte = iota
Minute
Hour
@@ -139,3 +140,10 @@ func (t TTL) Minutes() uint32 {
}
return 0
}
+
+func SecondsToTTL(seconds int32) string {
+ if seconds == 0 {
+ return ""
+ }
+ return fmt.Sprintf("%dm", seconds/60)
+}