diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-07-21 14:08:18 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-21 14:08:18 +0800 |
| commit | 6ea4ce722704171fdbddba61f423202a620b6ecf (patch) | |
| tree | b4e3e1deb1133cc7d34757c0981e30dabe196a03 /other/java/hdfs3/src | |
| parent | 5850bb733936399babbe2d77f4b27cac312e2798 (diff) | |
| parent | 885c624bceb61688c806b91350e70d75088c6eea (diff) | |
| download | seaweedfs-6ea4ce722704171fdbddba61f423202a620b6ecf.tar.xz seaweedfs-6ea4ce722704171fdbddba61f423202a620b6ecf.zip | |
Merge pull request #3 from chrislusf/master
sync
Diffstat (limited to 'other/java/hdfs3/src')
3 files changed, 7 insertions, 11 deletions
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 85490c181..2341d335d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem { public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index c26ad728f..6b3c72f7d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream { final Statistics statistics, final String path, final FilerProto.Entry entry, - final int bufferSize) { + final int bufferSize) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; @@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream { this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream { } } - return (int)bytesRead; + return (int) bytesRead; } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index c602a0d81..05805b9e5 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -60,12 +60,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, - 10L, + 120L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); @@ -113,7 +113,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); outputIndex += writableBytes; currentOffset += writableBytes; @@ -227,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea bufferToWrite.flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future<Void> job = completionService.submit(() -> { |
