aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-10-10 00:57:54 -0700
committerChris Lu <chris.lu@gmail.com>2020-10-10 00:57:57 -0700
commit00a75d7c99e87f543a4e0a4390732bb2eaa286fe (patch)
tree9316e7bebeee48667c458cb1adf818783b50ed35
parente1a3ffcdbf7981473398e9526c6e0d8cb0fb24a0 (diff)
downloadseaweedfs-00a75d7c99e87f543a4e0a4390732bb2eaa286fe.tar.xz
seaweedfs-00a75d7c99e87f543a4e0a4390732bb2eaa286fe.zip
Hadoop: fix reading file tail
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java22
1 files changed, 14 insertions, 8 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 7e5c5cb88..cc8befb7a 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -40,7 +40,7 @@ public class SeaweedRead {
//TODO parallel this
long readCount = 0;
- int startOffset = bufferOffset;
+ long startOffset = position;
for (ChunkView chunkView : chunkViews) {
if (startOffset < chunkView.logicOffset) {
@@ -57,7 +57,7 @@ public class SeaweedRead {
return 0;
}
- int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+ int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
@@ -66,7 +66,7 @@ public class SeaweedRead {
}
- long limit = Math.min(bufferLength, fileSize);
+ long limit = Math.min(bufferOffset + bufferLength, fileSize);
if (startOffset < limit) {
long gap = limit - startOffset;
@@ -78,7 +78,7 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@@ -88,9 +88,9 @@ public class SeaweedRead {
}
int len = (int) chunkView.size;
- LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
- chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
- System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len);
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.offset+chunkView.size, bufOffset, bufOffset+len, buffer.length, startOffset);
+ System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int)bufOffset, len);
return len;
}
@@ -98,7 +98,8 @@ public class SeaweedRead {
public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] data = null;
- for (long waitTime = 230L; waitTime < 20 * 1000; waitTime += waitTime / 2) {
+ IOException lastException = null;
+ for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
for (FilerProto.Location location : locations.getLocationsList()) {
String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
try {
@@ -106,6 +107,7 @@ public class SeaweedRead {
break;
} catch (IOException ioe) {
LOG.debug("doFetchFullChunkData {} :{}", url, ioe);
+ lastException = ioe;
}
}
if (data != null) {
@@ -117,6 +119,10 @@ public class SeaweedRead {
}
}
+ if (data == null) {
+ throw lastException;
+ }
+
LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
return data;