aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-10-10 16:20:26 +0800
committerGitHub <noreply@github.com>2020-10-10 16:20:26 +0800
commitb9a446839a52739ddfdd072d0208c4dfe659056d (patch)
treee363b33087262ad35c8f2ef96ed400e92101e020 /other/java
parent411e49f96494629fa3da334e8dc7cc15bd4d19cd (diff)
parent912ef2bc531c1bd09b21cadc87b8ea2620311cf3 (diff)
downloadseaweedfs-b9a446839a52739ddfdd072d0208c4dfe659056d.tar.xz
seaweedfs-b9a446839a52739ddfdd072d0208c4dfe659056d.zip
Merge pull request #24 from chrislusf/master
sync
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java54
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java8
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java5
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java8
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java5
7 files changed, 54 insertions, 30 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 045751717..ab2407dec 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -40,7 +40,7 @@ public class SeaweedRead {
//TODO parallel this
long readCount = 0;
- int startOffset = bufferOffset;
+ long startOffset = position;
for (ChunkView chunkView : chunkViews) {
if (startOffset < chunkView.logicOffset) {
@@ -57,7 +57,7 @@ public class SeaweedRead {
return 0;
}
- int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+ int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
@@ -66,7 +66,7 @@ public class SeaweedRead {
}
- long limit = Math.min(bufferLength, fileSize);
+ long limit = Math.min(bufferOffset + bufferLength, fileSize);
if (startOffset < limit) {
long gap = limit - startOffset;
@@ -78,7 +78,7 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@@ -88,17 +88,51 @@ public class SeaweedRead {
}
int len = (int) chunkView.size;
- LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
- chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
- System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len);
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset);
+ System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len);
return len;
}
public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
- HttpGet request = new HttpGet(
- String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
+ byte[] data = null;
+ IOException lastException = null;
+ for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
+ for (FilerProto.Location location : locations.getLocationsList()) {
+ String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
+ try {
+ data = doFetchOneFullChunkData(chunkView, url);
+ lastException = null;
+ break;
+ } catch (IOException ioe) {
+ LOG.debug("doFetchFullChunkData {} :{}", url, ioe);
+ lastException = ioe;
+ }
+ }
+ if (data != null) {
+ break;
+ }
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (lastException != null) {
+ throw lastException;
+ }
+
+ LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
+
+ return data;
+
+ }
+
+ public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
+
+ HttpGet request = new HttpGet(url);
request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
@@ -142,7 +176,7 @@ public class SeaweedRead {
data = Gzip.decompress(data);
}
- LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
+ LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length);
return data;
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 6551548fa..44c2ef053 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize);
+ FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 3dc38fe1e..eec5bd2d3 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -205,10 +205,9 @@ public class SeaweedFileSystemStore {
}
- public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
- int bufferSize) throws IOException {
+ public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException {
- LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
+ LOG.debug("openFileForRead path:{}", path);
FilerProto.Entry entry = lookupEntry(path);
@@ -219,8 +218,7 @@ public class SeaweedFileSystemStore {
return new SeaweedInputStream(filerGrpcClient,
statistics,
path.toUri().getPath(),
- entry,
- bufferSize);
+ entry);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 36c0766a4..8bda2e092 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream {
private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
- private final int bufferSize; // default buffer size
private long position = 0; // cursor of the file
@@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream {
final FilerGrpcClient filerGrpcClient,
final Statistics statistics,
final String path,
- final FilerProto.Entry entry,
- final int bufferSize) throws IOException {
+ final FilerProto.Entry entry) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry);
- this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 6551548fa..44c2ef053 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize);
+ FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index c76160bd2..7ec91929c 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -205,10 +205,9 @@ public class SeaweedFileSystemStore {
}
- public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
- int bufferSize) throws IOException {
+ public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException {
- LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
+ LOG.debug("openFileForRead path:{}", path);
FilerProto.Entry entry = lookupEntry(path);
@@ -219,8 +218,7 @@ public class SeaweedFileSystemStore {
return new SeaweedInputStream(filerGrpcClient,
statistics,
path.toUri().getPath(),
- entry,
- bufferSize);
+ entry);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 36c0766a4..8bda2e092 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream {
private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
- private final int bufferSize; // default buffer size
private long position = 0; // cursor of the file
@@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream {
final FilerGrpcClient filerGrpcClient,
final Statistics statistics,
final String path,
- final FilerProto.Entry entry,
- final int bufferSize) throws IOException {
+ final FilerProto.Entry entry) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry);
- this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());