From 735912e0f5de0d1d287fa6683504624a52e842e9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jul 2020 17:39:18 -0700 Subject: enable read cache --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 +- other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index f0490540d..1d8ded426 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -15,7 +15,7 @@ public class SeaweedRead { private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); - static ChunkCache chunkCache = new ChunkCache(0); + static ChunkCache chunkCache = new ChunkCache(4); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 5f4d888bd..d3cecab75 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -61,9 +61,6 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - // cache fileId ~ bytes - SeaweedRead.chunkCache.setChunk(fileId, bytes); - LOG.debug("write file chunk {} size {}", targetUrl, bytesLength); return FilerProto.FileChunk.newBuilder() -- cgit v1.2.3 From b684c312d232f4b8ffa3bb276033fd69c469d354 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jul 2020 18:25:35 -0700 Subject: minor --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 1d8ded426..af3aac0f5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -62,6 +62,7 @@ public class SeaweedRead { if (chunkData == null) { chunkData = doFetchFullChunkData(chunkView, locations); + chunkCache.setChunk(chunkView.fileId, chunkData); } int len = (int) chunkView.size; @@ -69,8 +70,6 @@ public class SeaweedRead { chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); - chunkCache.setChunk(chunkView.fileId, chunkData); - return len; } -- cgit v1.2.3 From daf8c0c8ce4c507264a175912d4693415f40f606 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jul 2020 18:25:52 -0700 Subject: cache vid locations --- .../main/java/seaweedfs/client/FileChunkManifest.java | 17 ++++++++++------- .../src/main/java/seaweedfs/client/FilerGrpcClient.java | 3 +++ 2 files changed, 13 insertions(+), 7 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index d8d29ede8..a15671f46 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -6,7 +6,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; public class FileChunkManifest { @@ -51,13 +50,17 @@ public class FileChunkManifest { private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { - FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); String vid = "" + chunk.getFid().getVolumeId(); - lookupRequest.addVolumeIds(vid); - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); - Map vid2Locations = lookupResponse.getLocationsMapMap(); - FilerProto.Locations locations = vid2Locations.get(vid); + FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid); + if (locations == null) { + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); + lookupRequest.addVolumeIds(vid); + FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + locations = lookupResponse.getLocationsMapMap().get(vid); + filerGrpcClient.vidLocations.put(vid, locations); + LOG.warn("fetchChunk vid:{} locations:{}", vid, locations); + } SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 57b67f6b0..1a719f3c0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -9,6 +9,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { @@ -24,6 +26,7 @@ public class FilerGrpcClient { } } + public final Map vidLocations = new HashMap<>(); private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; -- cgit v1.2.3 From 5080bc1d6964cc71044333edc7ee36c1c1f06adb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jul 2020 18:26:26 -0700 Subject: debug --- other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java | 2 +- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index a15671f46..28c2f47fc 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -59,7 +59,7 @@ public class FileChunkManifest { .getBlockingStub().lookupVolume(lookupRequest.build()); locations = lookupResponse.getLocationsMapMap().get(vid); filerGrpcClient.vidLocations.put(vid, locations); - LOG.warn("fetchChunk vid:{} locations:{}", vid, locations); + LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); } SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index af3aac0f5..05457ed48 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -108,6 +108,8 @@ public class SeaweedRead { } } + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + return data; } -- cgit v1.2.3 From 8cc35e2c13cb2385e0b2d7a0ee926b56ef757379 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 1 Aug 2020 11:18:34 -0700 Subject: fix wrong decompression logic --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 05457ed48..48d942f88 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -97,7 +97,7 @@ public class SeaweedRead { } if (chunkView.isCompressed) { - data = Gzip.decompress(data); + // data = Gzip.decompress(data); } if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { -- cgit v1.2.3 From 4d9da157bc4794524855f455d9df55c73f0dbdb0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 1 Aug 2020 11:36:29 -0700 Subject: HDFS: read gzip content --- .../src/main/java/seaweedfs/client/SeaweedRead.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 48d942f88..9edbfb799 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -2,6 +2,7 @@ package seaweedfs.client; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; +import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.util.EntityUtils; @@ -78,7 +79,7 @@ public class SeaweedRead { HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); byte[] data = null; @@ -87,6 +88,18 @@ public class SeaweedRead { try { HttpEntity entity = response.getEntity(); + Header contentEncodingHeader = entity.getContentEncoding(); + + if (contentEncodingHeader != null) { + HeaderElement[] encodings =contentEncodingHeader.getElements(); + for (int i = 0; i < encodings.length; i++) { + if (encodings[i].getName().equalsIgnoreCase("gzip")) { + entity = new GzipDecompressingEntity(entity); + break; + } + } + } + data = EntityUtils.toByteArray(entity); EntityUtils.consume(entity); @@ -96,10 +109,6 @@ public class SeaweedRead { request.releaseConnection(); } - if (chunkView.isCompressed) { - // data = Gzip.decompress(data); - } - if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { try { data = SeaweedCipher.decrypt(data, chunkView.cipherKey); -- cgit v1.2.3 From e1f070a9a1699578de4f1513f62453c7a490514f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 1 Aug 2020 12:42:41 -0700 Subject: Hadoop: 1.4.4 --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 9edbfb799..fa44ee4af 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -1,5 +1,7 @@ package seaweedfs.client; +import org.apache.http.Header; +import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.client.entity.GzipDecompressingEntity; -- cgit v1.2.3 From 0ed1f43d29921180f6de28148379dba3063c4109 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 2 Aug 2020 23:50:23 -0700 Subject: decompress after decrypt if necessary skip any decompress error --- other/java/client/src/main/java/seaweedfs/client/Gzip.java | 14 +++++++++----- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 4 ++++ 2 files changed, 13 insertions(+), 5 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java index 248285dd3..4909094f5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/Gzip.java +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -18,14 +18,18 @@ public class Gzip { return compressed; } - public static byte[] decompress(byte[] compressed) throws IOException { - ByteArrayInputStream bis = new ByteArrayInputStream(compressed); - GZIPInputStream gis = new GZIPInputStream(bis); - return readAll(gis); + public static byte[] decompress(byte[] compressed) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } catch (Exception e) { + return compressed; + } } private static byte[] readAll(InputStream input) throws IOException { - try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { byte[] buffer = new byte[4096]; int n; while (-1 != (n = input.read(buffer))) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index fa44ee4af..cd2f55678 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -119,6 +119,10 @@ public class SeaweedRead { } } + if (chunkView.isCompressed) { + data = Gzip.decompress(data); + } + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); return data; -- cgit v1.2.3 From 9981748498113759eaedf2efdb93a5b485d8c55a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 2 Aug 2020 23:50:44 -0700 Subject: only try to cache chunk manifest chunks --- .../client/src/main/java/seaweedfs/client/FileChunkManifest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 28c2f47fc..1248ff13f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -76,8 +76,11 @@ public class FileChunkManifest { LOG.debug("doFetchFullChunkData:{}", chunkView); chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); } - LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); - SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + if(chunk.getIsChunkManifest()){ + // only cache manifest chunks + LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); + SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + } return chunkData; -- cgit v1.2.3 From d6073f638670c24f3eb189b7d74dfaebd477fbff Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 00:40:23 -0700 Subject: ensure GC --- other/java/client/src/main/java/seaweedfs/client/ChunkCache.java | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index 58870d742..7afa2dca0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -15,6 +15,7 @@ public class ChunkCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) + .weakValues() .expireAfterAccess(1, TimeUnit.HOURS) .build(); } -- cgit v1.2.3 From 53190a997261a1d2a1d98d1f64eaea2d6a82124e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:05:49 -0700 Subject: fix compilation --- .../java/client/src/main/java/seaweedfs/client/FileChunkManifest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 1248ff13f..79e8d9bc4 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -76,8 +76,7 @@ public class FileChunkManifest { LOG.debug("doFetchFullChunkData:{}", chunkView); chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); } - if(chunk.getIsChunkManifest()){ - // only cache manifest chunks + if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); } -- cgit v1.2.3 From b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:06:09 -0700 Subject: add read ahead input stream --- .../seaweedfs/client/ReadAheadInputStream.java | 404 +++++++++++++++++++++ 1 file changed, 404 insertions(+) create mode 100644 other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java new file mode 100644 index 000000000..52c7ac09c --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java @@ -0,0 +1,404 @@ +package seaweedfs.client; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by + * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data + * which should be returned when a read() call is issued. The read ahead buffer is used to + * asynchronously read from the underlying input stream and once the current active buffer is + * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer + * without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Throwable readException; + + @GuardedBy("stateChangeLock") + // whether the close method is called. + private boolean isClosed; + + @GuardedBy("stateChangeLock") + // true when the close method will close the underlying input stream. This is valid only if + // `isClosed` is true. + private boolean isUnderlyingInputStreamBeingClosed; + + @GuardedBy("stateChangeLock") + // whether there is a read ahead task running, + private boolean isReading; + + // whether there is a reader waiting for data. + private AtomicBoolean isWaiting = new AtomicBoolean(false); + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build() + ); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + */ + public ReadAheadInputStream( + InputStream inputStream, int bufferSizeInBytes) { + Preconditions.checkArgument(bufferSizeInBytes > 0, + "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); + activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); + readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); + this.underlyingInputStream = inputStream; + activeBuffer.flip(); + readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { + return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream); + } + + private void checkReadException() throws IOException { + if (readAborted) { + Throwables.propagateIfPossible(readException, IOException.class); + throw new IOException(readException); + } + } + + /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ + private void readAsync() throws IOException { + stateChangeLock.lock(); + final byte[] arr = readAheadBuffer.array(); + try { + if (endOfStream || readInProgress) { + return; + } + checkReadException(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + readInProgress = true; + } finally { + stateChangeLock.unlock(); + } + executorService.execute(() -> { + stateChangeLock.lock(); + try { + if (isClosed) { + readInProgress = false; + return; + } + // Flip this so that the close method will not close the underlying input stream when we + // are reading. + isReading = true; + } finally { + stateChangeLock.unlock(); + } + + // Please note that it is safe to release the lock and read into the read ahead buffer + // because either of following two conditions will hold - 1. The active buffer has + // data available to read so the reader will not read from the read ahead buffer. + // 2. This is the first time read is called or the active buffer is exhausted, + // in that case the reader waits for this async read to complete. + // So there is no race condition in both the situations. + int read = 0; + int off = 0, len = arr.length; + Throwable exception = null; + try { + // try to fill the read ahead buffer. + // if a reader is waiting, possibly return early. + do { + read = underlyingInputStream.read(arr, off, len); + if (read <= 0) break; + off += read; + len -= read; + } while (len > 0 && !isWaiting.get()); + } catch (Throwable ex) { + exception = ex; + if (ex instanceof Error) { + // `readException` may not be reported to the user. Rethrow Error to make sure at least + // The user can see Error in UncaughtExceptionHandler. + throw (Error) ex; + } + } finally { + stateChangeLock.lock(); + readAheadBuffer.limit(off); + if (read < 0 || (exception instanceof EOFException)) { + endOfStream = true; + } else if (exception != null) { + readAborted = true; + readException = exception; + } + readInProgress = false; + signalAsyncReadComplete(); + stateChangeLock.unlock(); + closeUnderlyingInputStreamIfNecessary(); + } + }); + } + + private void closeUnderlyingInputStreamIfNecessary() { + boolean needToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); + try { + isReading = false; + if (isClosed && !isUnderlyingInputStreamBeingClosed) { + // close method cannot close underlyingInputStream because we were reading. + needToCloseUnderlyingInputStream = true; + } + } finally { + stateChangeLock.unlock(); + } + if (needToCloseUnderlyingInputStream) { + try { + underlyingInputStream.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + } + + private void signalAsyncReadComplete() { + stateChangeLock.lock(); + try { + asyncReadComplete.signalAll(); + } finally { + stateChangeLock.unlock(); + } + } + + private void waitForAsyncReadComplete() throws IOException { + stateChangeLock.lock(); + isWaiting.set(true); + try { + // There is only one reader, and one writer, so the writer should signal only once, + // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. + while (readInProgress) { + asyncReadComplete.await(); + } + } catch (InterruptedException e) { + InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; + } finally { + isWaiting.set(false); + stateChangeLock.unlock(); + } + checkReadException(); + } + + @Override + public int read() throws IOException { + if (activeBuffer.hasRemaining()) { + // short path - just get one byte. + return activeBuffer.get() & 0xFF; + } else { + byte[] oneByteArray = oneByte.get(); + return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; + } + } + + @Override + public int read(byte[] b, int offset, int len) throws IOException { + if (offset < 0 || len < 0 || len > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + + if (!activeBuffer.hasRemaining()) { + // No remaining in active buffer - lock and switch to write ahead buffer. + stateChangeLock.lock(); + try { + waitForAsyncReadComplete(); + if (!readAheadBuffer.hasRemaining()) { + // The first read. + readAsync(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return -1; + } + } + // Swap the newly read read ahead buffer in place of empty active buffer. + swapBuffers(); + // After swapping buffers, trigger another async read for read ahead buffer. + readAsync(); + } finally { + stateChangeLock.unlock(); + } + } + len = Math.min(len, activeBuffer.remaining()); + activeBuffer.get(b, offset, len); + + return len; + } + + /** + * flip the active and read ahead buffer + */ + private void swapBuffers() { + ByteBuffer temp = activeBuffer; + activeBuffer = readAheadBuffer; + readAheadBuffer = temp; + } + + @Override + public int available() throws IOException { + stateChangeLock.lock(); + // Make sure we have no integer overflow. + try { + return (int) Math.min((long) Integer.MAX_VALUE, + (long) activeBuffer.remaining() + readAheadBuffer.remaining()); + } finally { + stateChangeLock.unlock(); + } + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0L) { + return 0L; + } + if (n <= activeBuffer.remaining()) { + // Only skipping from active buffer is sufficient + activeBuffer.position((int) n + activeBuffer.position()); + return n; + } + stateChangeLock.lock(); + long skipped; + try { + skipped = skipInternal(n); + } finally { + stateChangeLock.unlock(); + } + return skipped; + } + + /** + * Internal skip function which should be called only from skip() api. The assumption is that + * the stateChangeLock is already acquired in the caller before calling this function. + */ + private long skipInternal(long n) throws IOException { + assert (stateChangeLock.isLocked()); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return 0; + } + if (available() >= n) { + // we can skip from the internal buffers + int toSkip = (int) n; + // We need to skip from both active buffer and read ahead buffer + toSkip -= activeBuffer.remaining(); + assert(toSkip > 0); // skipping from activeBuffer already handled. + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(toSkip + readAheadBuffer.position()); + swapBuffers(); + // Trigger async read to emptied read ahead buffer. + readAsync(); + return n; + } else { + int skippedBytes = available(); + long toSkip = n - skippedBytes; + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + long skippedFromInputStream = underlyingInputStream.skip(toSkip); + readAsync(); + return skippedBytes + skippedFromInputStream; + } + } + + @Override + public void close() throws IOException { + boolean isSafeToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); + try { + if (isClosed) { + return; + } + isClosed = true; + if (!isReading) { + // Nobody is reading, so we can close the underlying input stream in this method. + isSafeToCloseUnderlyingInputStream = true; + // Flip this to make sure the read ahead task will not close the underlying input stream. + isUnderlyingInputStreamBeingClosed = true; + } + } finally { + stateChangeLock.unlock(); + } + + try { + executorService.shutdownNow(); + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; + } finally { + if (isSafeToCloseUnderlyingInputStream) { + underlyingInputStream.close(); + } + } + } +} -- cgit v1.2.3 From 15dc0a704db7aa542471b56f10ceb749dc041b12 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:11:24 -0700 Subject: Revert "add read ahead input stream" This reverts commit b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6. --- .../seaweedfs/client/ReadAheadInputStream.java | 404 --------------------- 1 file changed, 404 deletions(-) delete mode 100644 other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java deleted file mode 100644 index 52c7ac09c..000000000 --- a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java +++ /dev/null @@ -1,404 +0,0 @@ -package seaweedfs.client; - -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -// package org.apache.spark.io; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.GuardedBy; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * {@link InputStream} implementation which asynchronously reads ahead from the underlying input - * stream when specified amount of data has been read from the current buffer. It does it by - * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data - * which should be returned when a read() call is issued. The read ahead buffer is used to - * asynchronously read from the underlying input stream and once the current active buffer is - * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer - * without being blocked in disk I/O. - */ -public class ReadAheadInputStream extends InputStream { - - private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); - - private ReentrantLock stateChangeLock = new ReentrantLock(); - - @GuardedBy("stateChangeLock") - private ByteBuffer activeBuffer; - - @GuardedBy("stateChangeLock") - private ByteBuffer readAheadBuffer; - - @GuardedBy("stateChangeLock") - private boolean endOfStream; - - @GuardedBy("stateChangeLock") - // true if async read is in progress - private boolean readInProgress; - - @GuardedBy("stateChangeLock") - // true if read is aborted due to an exception in reading from underlying input stream. - private boolean readAborted; - - @GuardedBy("stateChangeLock") - private Throwable readException; - - @GuardedBy("stateChangeLock") - // whether the close method is called. - private boolean isClosed; - - @GuardedBy("stateChangeLock") - // true when the close method will close the underlying input stream. This is valid only if - // `isClosed` is true. - private boolean isUnderlyingInputStreamBeingClosed; - - @GuardedBy("stateChangeLock") - // whether there is a read ahead task running, - private boolean isReading; - - // whether there is a reader waiting for data. - private AtomicBoolean isWaiting = new AtomicBoolean(false); - - private final InputStream underlyingInputStream; - - private final ExecutorService executorService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build() - ); - - private final Condition asyncReadComplete = stateChangeLock.newCondition(); - - private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); - - /** - * Creates a ReadAheadInputStream with the specified buffer size and read-ahead - * threshold - * - * @param inputStream The underlying input stream. - * @param bufferSizeInBytes The buffer size. - */ - public ReadAheadInputStream( - InputStream inputStream, int bufferSizeInBytes) { - Preconditions.checkArgument(bufferSizeInBytes > 0, - "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); - activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); - readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); - this.underlyingInputStream = inputStream; - activeBuffer.flip(); - readAheadBuffer.flip(); - } - - private boolean isEndOfStream() { - return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream); - } - - private void checkReadException() throws IOException { - if (readAborted) { - Throwables.propagateIfPossible(readException, IOException.class); - throw new IOException(readException); - } - } - - /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ - private void readAsync() throws IOException { - stateChangeLock.lock(); - final byte[] arr = readAheadBuffer.array(); - try { - if (endOfStream || readInProgress) { - return; - } - checkReadException(); - readAheadBuffer.position(0); - readAheadBuffer.flip(); - readInProgress = true; - } finally { - stateChangeLock.unlock(); - } - executorService.execute(() -> { - stateChangeLock.lock(); - try { - if (isClosed) { - readInProgress = false; - return; - } - // Flip this so that the close method will not close the underlying input stream when we - // are reading. - isReading = true; - } finally { - stateChangeLock.unlock(); - } - - // Please note that it is safe to release the lock and read into the read ahead buffer - // because either of following two conditions will hold - 1. The active buffer has - // data available to read so the reader will not read from the read ahead buffer. - // 2. This is the first time read is called or the active buffer is exhausted, - // in that case the reader waits for this async read to complete. - // So there is no race condition in both the situations. - int read = 0; - int off = 0, len = arr.length; - Throwable exception = null; - try { - // try to fill the read ahead buffer. - // if a reader is waiting, possibly return early. - do { - read = underlyingInputStream.read(arr, off, len); - if (read <= 0) break; - off += read; - len -= read; - } while (len > 0 && !isWaiting.get()); - } catch (Throwable ex) { - exception = ex; - if (ex instanceof Error) { - // `readException` may not be reported to the user. Rethrow Error to make sure at least - // The user can see Error in UncaughtExceptionHandler. - throw (Error) ex; - } - } finally { - stateChangeLock.lock(); - readAheadBuffer.limit(off); - if (read < 0 || (exception instanceof EOFException)) { - endOfStream = true; - } else if (exception != null) { - readAborted = true; - readException = exception; - } - readInProgress = false; - signalAsyncReadComplete(); - stateChangeLock.unlock(); - closeUnderlyingInputStreamIfNecessary(); - } - }); - } - - private void closeUnderlyingInputStreamIfNecessary() { - boolean needToCloseUnderlyingInputStream = false; - stateChangeLock.lock(); - try { - isReading = false; - if (isClosed && !isUnderlyingInputStreamBeingClosed) { - // close method cannot close underlyingInputStream because we were reading. - needToCloseUnderlyingInputStream = true; - } - } finally { - stateChangeLock.unlock(); - } - if (needToCloseUnderlyingInputStream) { - try { - underlyingInputStream.close(); - } catch (IOException e) { - logger.warn(e.getMessage(), e); - } - } - } - - private void signalAsyncReadComplete() { - stateChangeLock.lock(); - try { - asyncReadComplete.signalAll(); - } finally { - stateChangeLock.unlock(); - } - } - - private void waitForAsyncReadComplete() throws IOException { - stateChangeLock.lock(); - isWaiting.set(true); - try { - // There is only one reader, and one writer, so the writer should signal only once, - // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. - while (readInProgress) { - asyncReadComplete.await(); - } - } catch (InterruptedException e) { - InterruptedIOException iio = new InterruptedIOException(e.getMessage()); - iio.initCause(e); - throw iio; - } finally { - isWaiting.set(false); - stateChangeLock.unlock(); - } - checkReadException(); - } - - @Override - public int read() throws IOException { - if (activeBuffer.hasRemaining()) { - // short path - just get one byte. - return activeBuffer.get() & 0xFF; - } else { - byte[] oneByteArray = oneByte.get(); - return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; - } - } - - @Override - public int read(byte[] b, int offset, int len) throws IOException { - if (offset < 0 || len < 0 || len > b.length - offset) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - - if (!activeBuffer.hasRemaining()) { - // No remaining in active buffer - lock and switch to write ahead buffer. - stateChangeLock.lock(); - try { - waitForAsyncReadComplete(); - if (!readAheadBuffer.hasRemaining()) { - // The first read. - readAsync(); - waitForAsyncReadComplete(); - if (isEndOfStream()) { - return -1; - } - } - // Swap the newly read read ahead buffer in place of empty active buffer. - swapBuffers(); - // After swapping buffers, trigger another async read for read ahead buffer. - readAsync(); - } finally { - stateChangeLock.unlock(); - } - } - len = Math.min(len, activeBuffer.remaining()); - activeBuffer.get(b, offset, len); - - return len; - } - - /** - * flip the active and read ahead buffer - */ - private void swapBuffers() { - ByteBuffer temp = activeBuffer; - activeBuffer = readAheadBuffer; - readAheadBuffer = temp; - } - - @Override - public int available() throws IOException { - stateChangeLock.lock(); - // Make sure we have no integer overflow. - try { - return (int) Math.min((long) Integer.MAX_VALUE, - (long) activeBuffer.remaining() + readAheadBuffer.remaining()); - } finally { - stateChangeLock.unlock(); - } - } - - @Override - public long skip(long n) throws IOException { - if (n <= 0L) { - return 0L; - } - if (n <= activeBuffer.remaining()) { - // Only skipping from active buffer is sufficient - activeBuffer.position((int) n + activeBuffer.position()); - return n; - } - stateChangeLock.lock(); - long skipped; - try { - skipped = skipInternal(n); - } finally { - stateChangeLock.unlock(); - } - return skipped; - } - - /** - * Internal skip function which should be called only from skip() api. The assumption is that - * the stateChangeLock is already acquired in the caller before calling this function. - */ - private long skipInternal(long n) throws IOException { - assert (stateChangeLock.isLocked()); - waitForAsyncReadComplete(); - if (isEndOfStream()) { - return 0; - } - if (available() >= n) { - // we can skip from the internal buffers - int toSkip = (int) n; - // We need to skip from both active buffer and read ahead buffer - toSkip -= activeBuffer.remaining(); - assert(toSkip > 0); // skipping from activeBuffer already handled. - activeBuffer.position(0); - activeBuffer.flip(); - readAheadBuffer.position(toSkip + readAheadBuffer.position()); - swapBuffers(); - // Trigger async read to emptied read ahead buffer. - readAsync(); - return n; - } else { - int skippedBytes = available(); - long toSkip = n - skippedBytes; - activeBuffer.position(0); - activeBuffer.flip(); - readAheadBuffer.position(0); - readAheadBuffer.flip(); - long skippedFromInputStream = underlyingInputStream.skip(toSkip); - readAsync(); - return skippedBytes + skippedFromInputStream; - } - } - - @Override - public void close() throws IOException { - boolean isSafeToCloseUnderlyingInputStream = false; - stateChangeLock.lock(); - try { - if (isClosed) { - return; - } - isClosed = true; - if (!isReading) { - // Nobody is reading, so we can close the underlying input stream in this method. - isSafeToCloseUnderlyingInputStream = true; - // Flip this to make sure the read ahead task will not close the underlying input stream. - isUnderlyingInputStreamBeingClosed = true; - } - } finally { - stateChangeLock.unlock(); - } - - try { - executorService.shutdownNow(); - executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException e) { - InterruptedIOException iio = new InterruptedIOException(e.getMessage()); - iio.initCause(e); - throw iio; - } finally { - if (isSafeToCloseUnderlyingInputStream) { - underlyingInputStream.close(); - } - } - } -} -- cgit v1.2.3 From 13bfe5deefcb83a5a89f9f15e7a15c29f97b1730 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 16 Aug 2020 21:14:39 -0700 Subject: same logic for reading random access files from Go --- .../main/java/seaweedfs/client/SeaweedRead.java | 55 ++++++++++++++++++---- 1 file changed, 45 insertions(+), 10 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index cd2f55678..045751717 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -23,7 +23,7 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) throws IOException { + final int bufferLength, final long fileSize) throws IOException { List chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); @@ -42,6 +42,14 @@ public class SeaweedRead { long readCount = 0; int startOffset = bufferOffset; for (ChunkView chunkView : chunkViews) { + + if (startOffset < chunkView.logicOffset) { + long gap = chunkView.logicOffset - startOffset; + LOG.debug("zero [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); if (locations == null || locations.getLocationsCount() == 0) { LOG.error("failed to locate {}", chunkView.fileId); @@ -51,11 +59,22 @@ public class SeaweedRead { int len = readChunkView(position, buffer, startOffset, chunkView, locations); + LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); + readCount += len; startOffset += len; } + long limit = Math.min(bufferLength, fileSize); + + if (startOffset < limit) { + long gap = limit - startOffset; + LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + return readCount; } @@ -71,7 +90,7 @@ public class SeaweedRead { int len = (int) chunkView.size; LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); - System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); + System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len); return len; } @@ -93,7 +112,7 @@ public class SeaweedRead { Header contentEncodingHeader = entity.getContentEncoding(); if (contentEncodingHeader != null) { - HeaderElement[] encodings =contentEncodingHeader.getElements(); + HeaderElement[] encodings = contentEncodingHeader.getElements(); for (int i = 0; i < encodings.length; i++) { if (encodings[i].getName().equalsIgnoreCase("gzip")) { entity = new GzipDecompressingEntity(entity); @@ -134,18 +153,19 @@ public class SeaweedRead { long stop = offset + size; for (VisibleInterval chunk : visibleIntervals) { - if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + long chunkStart = Math.max(offset, chunk.start); + long chunkStop = Math.min(stop, chunk.stop); + if (chunkStart < chunkStop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, + chunkStart - chunk.start + chunk.chunkOffset, + chunkStop - chunkStart, + chunkStart, isFullChunk, chunk.cipherKey, chunk.isCompressed )); - offset = Math.min(chunk.stop, stop); } } return views; @@ -160,7 +180,13 @@ public class SeaweedRead { Arrays.sort(chunks, new Comparator() { @Override public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - return (int) (a.getMtime() - b.getMtime()); + // if just a.getMtime() - b.getMtime(), it will overflow! + if (a.getMtime() < b.getMtime()) { + return -1; + } else if (a.getMtime() > b.getMtime()) { + return 1; + } + return 0; } }); @@ -181,6 +207,7 @@ public class SeaweedRead { chunk.getOffset() + chunk.getSize(), chunk.getFileId(), chunk.getMtime(), + 0, true, chunk.getCipherKey().toByteArray(), chunk.getIsCompressed() @@ -203,6 +230,7 @@ public class SeaweedRead { chunk.getOffset(), v.fileId, v.modifiedTime, + v.chunkOffset, false, v.cipherKey, v.isCompressed @@ -215,6 +243,7 @@ public class SeaweedRead { v.stop, v.fileId, v.modifiedTime, + v.chunkOffset + (chunkStop - v.start), false, v.cipherKey, v.isCompressed @@ -247,6 +276,10 @@ public class SeaweedRead { return fileId; } + public static long fileSize(FilerProto.Entry entry) { + return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize()); + } + public static long totalSize(List chunksList) { long size = 0; for (FilerProto.FileChunk chunk : chunksList) { @@ -263,15 +296,17 @@ public class SeaweedRead { public final long stop; public final long modifiedTime; public final String fileId; + public final long chunkOffset; public final boolean isFullChunk; public final byte[] cipherKey; public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; + this.chunkOffset = chunkOffset; this.isFullChunk = isFullChunk; this.cipherKey = cipherKey; this.isCompressed = isCompressed; -- cgit v1.2.3 From ca658a97c5248ba099356b006f0b341af53b0816 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 28 Aug 2020 23:48:48 -0700 Subject: add signatures to messages to avoid double processing --- other/java/client/src/main/proto/filer.proto | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index dcc18f2a5..65947e674 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -102,6 +102,7 @@ message EventNotification { bool delete_chunks = 3; string new_parent_path = 4; bool is_from_other_cluster = 5; + repeated int32 signatures = 6; } message FileChunk { @@ -150,6 +151,7 @@ message CreateEntryRequest { Entry entry = 2; bool o_excl = 3; bool is_from_other_cluster = 4; + repeated int32 signatures = 5; } message CreateEntryResponse { @@ -160,6 +162,7 @@ message UpdateEntryRequest { string directory = 1; Entry entry = 2; bool is_from_other_cluster = 3; + repeated int32 signatures = 4; } message UpdateEntryResponse { } @@ -180,6 +183,7 @@ message DeleteEntryRequest { bool is_recursive = 5; bool ignore_recursive_error = 6; bool is_from_other_cluster = 7; + repeated int32 signatures = 8; } message DeleteEntryResponse { @@ -268,6 +272,7 @@ message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; + int32 signature = 4; } message SubscribeMetadataResponse { string directory = 1; -- cgit v1.2.3 From bba90ff3c822914a8a2da4369e65756ff366cef2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 5 Sep 2020 22:52:15 -0700 Subject: read filer signature --- other/java/client/src/main/proto/filer.proto | 1 + 1 file changed, 1 insertion(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 65947e674..4d3924bf5 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -266,6 +266,7 @@ message GetFilerConfigurationResponse { uint32 max_mb = 4; string dir_buckets = 5; bool cipher = 7; + int32 signature = 8; } message SubscribeMetadataRequest { -- cgit v1.2.3 From 387ab6796f274151f802ccdab8756b959b5fb1cb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 9 Sep 2020 11:21:23 -0700 Subject: filer: cross cluster synchronization --- other/java/client/src/main/proto/filer.proto | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 4d3924bf5..cf88065ef 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -58,6 +58,12 @@ service SeaweedFiler { rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { } + rpc KvGet (KvGetRequest) returns (KvGetResponse) { + } + + rpc KvPut (KvPutRequest) returns (KvPutResponse) { + } + } ////////////////////////////////////////////////// @@ -308,3 +314,19 @@ message LocateBrokerResponse { } repeated Resource resources = 2; } + +// Key-Value operations +message KvGetRequest { + bytes key = 1; +} +message KvGetResponse { + bytes value = 1; + string error = 2; +} +message KvPutRequest { + bytes key = 1; + bytes value = 2; +} +message KvPutResponse { + string error = 1; +} -- cgit v1.2.3 From cb427d48fa42241b9396fa850e4bf02ecddc21ed Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 17 Sep 2020 06:46:51 -0700 Subject: filer report metrics configuration --- other/java/client/src/main/proto/filer.proto | 2 ++ 1 file changed, 2 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index cf88065ef..9a72bc976 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -273,6 +273,8 @@ message GetFilerConfigurationResponse { string dir_buckets = 5; bool cipher = 7; int32 signature = 8; + string metrics_address = 9; + int32 metrics_interval_sec = 10; } message SubscribeMetadataRequest { -- cgit v1.2.3