aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-07-20 20:27:19 -0700
committerChris Lu <chris.lu@gmail.com>2020-07-20 20:27:19 -0700
commitae3e6d824499b499b3b53726e7f25751756456ae (patch)
tree744632a0cc4f74ab4505f81fb995c85ca4587eb9 /other/java
parent0b2e06268b0d43c902754c34d3040092a7c83876 (diff)
downloadseaweedfs-ae3e6d824499b499b3b53726e7f25751756456ae.tar.xz
seaweedfs-ae3e6d824499b499b3b53726e7f25751756456ae.zip
remove changing buffer size
Diffstat (limited to 'other/java')
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java6
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java5
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java8
5 files changed, 9 insertions, 16 deletions
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 85490c181..2341d335d 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 209e32d0b..d62d74fb1 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -56,12 +56,12 @@ public class SeaweedOutputStream extends OutputStream {
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
- 10L,
+ 120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@@ -180,7 +180,7 @@ public class SeaweedOutputStream extends OutputStream {
bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 85490c181..2341d335d 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 10ec9b3cc..6b3c72f7d 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
- final int bufferSize) {
+ final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
@@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream {
}
}
- return (int)bytesRead;
+ return (int) bytesRead;
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index c602a0d81..05805b9e5 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -60,12 +60,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
- 10L,
+ 120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@@ -113,7 +113,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;
@@ -227,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {