aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java23
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java135
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java86
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java17
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java9
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java42
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java5
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java7
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java8
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java3
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java7
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java8
13 files changed, 277 insertions, 76 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
index 897fe9694..55f003a18 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -1,22 +1,39 @@
package seaweedfs.client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ByteBufferPool {
- static List<ByteBuffer> bufferList = new ArrayList<>();
+ private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024;
+ private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
+
+ private static final List<ByteBuffer> bufferList = new ArrayList<>();
public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ bufferSize = MIN_BUFFER_SIZE;
+ }
if (bufferList.isEmpty()) {
return ByteBuffer.allocate(bufferSize);
}
- return bufferList.remove(bufferList.size()-1);
+ ByteBuffer buffer = bufferList.remove(bufferList.size() - 1);
+ if (buffer.capacity() >= bufferSize) {
+ return buffer;
+ }
+
+ LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize);
+ bufferList.add(0, buffer);
+ return ByteBuffer.allocate(bufferSize);
+
}
public static synchronized void release(ByteBuffer obj) {
- bufferList.add(obj);
+ bufferList.add(0, obj);
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
new file mode 100644
index 000000000..d8d29ede8
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -0,0 +1,135 @@
+package seaweedfs.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FileChunkManifest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class);
+
+ private static final int mergeFactor = 1000;
+
+ public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) {
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (chunk.getIsChunkManifest()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<FilerProto.FileChunk> resolveChunkManifest(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException {
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+
+ for (FilerProto.FileChunk chunk : chunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ continue;
+ }
+
+ // IsChunkManifest
+ LOG.debug("fetching chunk manifest:{}", chunk);
+ byte[] data = fetchChunk(filerGrpcClient, chunk);
+ FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
+ List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk t : m.getChunksList()) {
+ // avoid deprecated chunk.getFileId()
+ resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
+ }
+ dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks));
+ }
+
+ return dataChunks;
+ }
+
+ private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
+
+ FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
+ String vid = "" + chunk.getFid().getVolumeId();
+ lookupRequest.addVolumeIds(vid);
+ FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ .getBlockingStub().lookupVolume(lookupRequest.build());
+ Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
+ FilerProto.Locations locations = vid2Locations.get(vid);
+
+ SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
+ FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()
+ 0,
+ -1,
+ 0,
+ true,
+ chunk.getCipherKey().toByteArray(),
+ chunk.getIsCompressed());
+
+ byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
+ if (chunkData == null) {
+ LOG.debug("doFetchFullChunkData:{}", chunkView);
+ chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
+ }
+ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
+ SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
+
+ return chunkData;
+
+ }
+
+ public static List<FilerProto.FileChunk> maybeManifestize(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks) throws IOException {
+ // the return variable
+ List<FilerProto.FileChunk> chunks = new ArrayList<>();
+
+ List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : inputChunks) {
+ if (!chunk.getIsChunkManifest()) {
+ dataChunks.add(chunk);
+ } else {
+ chunks.add(chunk);
+ }
+ }
+
+ int remaining = dataChunks.size();
+ for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
+ FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor));
+ chunks.add(chunk);
+ remaining -= mergeFactor;
+ }
+
+ // remaining
+ for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) {
+ chunks.add(dataChunks.get(i));
+ }
+ return chunks;
+ }
+
+ private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks) throws IOException {
+ // create and serialize the manifest
+ dataChunks = FilerClient.beforeEntrySerialization(dataChunks);
+ FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
+ byte[] data = m.build().toByteArray();
+
+ long minOffset = Long.MAX_VALUE;
+ long maxOffset = -1;
+ for (FilerProto.FileChunk chunk : dataChunks) {
+ minOffset = Math.min(minOffset, chunk.getOffset());
+ maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset());
+ }
+
+ FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
+ filerGrpcClient.getReplication(),
+ filerGrpcClient,
+ minOffset,
+ data, 0, data.length);
+ manifestChunk.setIsChunkManifest(true);
+ manifestChunk.setSize(maxOffset - minOffset);
+ return manifestChunk.build();
+
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 2103fc699..468a95e28 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -24,6 +24,67 @@ public class FilerClient {
this.filerGrpcClient = filerGrpcClient;
}
+ public static String toFileId(FilerProto.FileId fid) {
+ if (fid == null) {
+ return null;
+ }
+ return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
+ }
+
+ public static FilerProto.FileId toFileIdObject(String fileIdStr) {
+ if (fileIdStr == null || fileIdStr.length() == 0) {
+ return null;
+ }
+ int commaIndex = fileIdStr.lastIndexOf(',');
+ String volumeIdStr = fileIdStr.substring(0, commaIndex);
+ String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8);
+ String cookieStr = fileIdStr.substring(fileIdStr.length() - 8);
+
+ return FilerProto.FileId.newBuilder()
+ .setVolumeId(Integer.parseInt(volumeIdStr))
+ .setFileKey(Long.parseLong(fileKeyStr, 16))
+ .setCookie((int) Long.parseLong(cookieStr, 16))
+ .build();
+ }
+
+ public static List<FilerProto.FileChunk> beforeEntrySerialization(List<FilerProto.FileChunk> chunks) {
+ List<FilerProto.FileChunk> cleanedChunks = new ArrayList<>();
+ for (FilerProto.FileChunk chunk : chunks) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.clearFileId();
+ chunkBuilder.clearSourceFileId();
+ chunkBuilder.setFid(toFileIdObject(chunk.getFileId()));
+ FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId());
+ if (sourceFid != null) {
+ chunkBuilder.setSourceFid(sourceFid);
+ }
+ cleanedChunks.add(chunkBuilder.build());
+ }
+ return cleanedChunks;
+ }
+
+ public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) {
+ if (entry.getChunksList().size() <= 0) {
+ return entry;
+ }
+ String fileId = entry.getChunks(0).getFileId();
+ if (fileId != null && fileId.length() != 0) {
+ return entry;
+ }
+ FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
+ entryBuilder.clearChunks();
+ for (FilerProto.FileChunk chunk : entry.getChunksList()) {
+ FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
+ chunkBuilder.setFileId(toFileId(chunk.getFid()));
+ String sourceFileId = toFileId(chunk.getSourceFid());
+ if (sourceFileId != null) {
+ chunkBuilder.setSourceFileId(sourceFileId);
+ }
+ entryBuilder.addChunks(chunkBuilder);
+ }
+ return entryBuilder.build();
+ }
+
public boolean mkdirs(String path, int mode) {
String currentUser = System.getProperty("user.name");
return mkdirs(path, mode, 0, 0, currentUser, new String[]{});
@@ -184,7 +245,7 @@ public class FilerClient {
List<FilerProto.Entry> entries = new ArrayList<>();
while (iter.hasNext()) {
FilerProto.ListEntriesResponse resp = iter.next();
- entries.add(fixEntryAfterReading(resp.getEntry()));
+ entries.add(afterEntryDeserialization(resp.getEntry()));
}
return entries;
}
@@ -199,7 +260,7 @@ public class FilerClient {
if (entry == null) {
return null;
}
- return fixEntryAfterReading(entry);
+ return afterEntryDeserialization(entry);
} catch (Exception e) {
if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
return null;
@@ -209,7 +270,6 @@ public class FilerClient {
}
}
-
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
@@ -267,24 +327,4 @@ public class FilerClient {
return true;
}
- private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) {
- if (entry.getChunksList().size() <= 0) {
- return entry;
- }
- String fileId = entry.getChunks(0).getFileId();
- if (fileId != null && fileId.length() != 0) {
- return entry;
- }
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- entryBuilder.clearChunks();
- for (FilerProto.FileChunk chunk : entry.getChunksList()) {
- FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder();
- FilerProto.FileId fid = chunk.getFid();
- fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie());
- chunkBuilder.setFileId(fileId);
- entryBuilder.addChunks(chunkBuilder);
- }
- return entryBuilder.build();
- }
-
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 301919919..f0490540d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -2,16 +2,12 @@ package seaweedfs.client;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.util.*;
@@ -19,7 +15,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
- static ChunkCache chunkCache = new ChunkCache(16);
+ static ChunkCache chunkCache = new ChunkCache(0);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@@ -44,7 +40,8 @@ public class SeaweedRead {
int startOffset = bufferOffset;
for (ChunkView chunkView : chunkViews) {
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
- if (locations.getLocationsCount() == 0) {
+ if (locations == null || locations.getLocationsCount() == 0) {
+ LOG.error("failed to locate {}", chunkView.fileId);
// log here!
return 0;
}
@@ -77,7 +74,7 @@ public class SeaweedRead {
return len;
}
- private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@@ -138,7 +135,11 @@ public class SeaweedRead {
return views;
}
- public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) {
+ public static List<VisibleInterval> nonOverlappingVisibleIntervals(
+ final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException {
+
+ chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList);
+
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
index e2835b718..c465d935f 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -9,19 +9,22 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class SeaweedUtil {
static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+ static CloseableHttpClient httpClient;
static {
// Increase max total connection to 200
cm.setMaxTotal(200);
// Increase default max connection per route to 20
cm.setDefaultMaxPerRoute(20);
- }
- public static CloseableHttpClient getClosableHttpClient() {
- return HttpClientBuilder.create()
+ httpClient = HttpClientBuilder.create()
.setConnectionManager(cm)
.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
}
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return httpClient;
+ }
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index e9819668c..fd54453a1 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -1,8 +1,6 @@
package seaweedfs.client;
import com.google.protobuf.ByteString;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
@@ -10,10 +8,10 @@ import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
+import java.util.List;
public class SeaweedWrite {
@@ -25,6 +23,17 @@ public class SeaweedWrite {
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
+ synchronized (entry) {
+ entry.addChunks(writeChunk(replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength));
+ }
+ }
+
+ public static FilerProto.FileChunk.Builder writeChunk(final String replication,
+ final FilerGrpcClient filerGrpcClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength) throws IOException {
FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerGrpcClient.getCollection())
@@ -46,25 +55,28 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- synchronized (entry) {
- entry.addChunks(FilerProto.FileChunk.newBuilder()
- .setFileId(fileId)
- .setOffset(offset)
- .setSize(bytesLength)
- .setMtime(System.currentTimeMillis() / 10000L)
- .setETag(etag)
- .setCipherKey(cipherKeyString)
- );
- }
-
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);
+ return FilerProto.FileChunk.newBuilder()
+ .setFileId(fileId)
+ .setOffset(offset)
+ .setSize(bytesLength)
+ .setMtime(System.currentTimeMillis() / 10000L)
+ .setETag(etag)
+ .setCipherKey(cipherKeyString);
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String parentDirectory, final FilerProto.Entry.Builder entry) {
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
+
+ int chunkSize = entry.getChunksCount();
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList());
+
synchronized (entry) {
+ entry.clearChunks();
+ entry.addAllChunks(chunks);
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
index ccfcdb117..44b833c90 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
@@ -3,13 +3,14 @@ package seaweedfs.client;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SeaweedReadTest {
@Test
- public void testNonOverlappingVisibleIntervals() {
+ public void testNonOverlappingVisibleIntervals() throws IOException {
List<FilerProto.FileChunk> chunks = new ArrayList<>();
chunks.add(FilerProto.FileChunk.newBuilder()
.setFileId("aaa")
@@ -24,7 +25,7 @@ public class SeaweedReadTest {
.setMtime(2000)
.build());
- List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks);
+ List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks);
for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) {
System.out.println("visible:" + visibleInterval);
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 85490c181..2341d335d 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index c26ad728f..6b3c72f7d 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
- final int bufferSize) {
+ final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
@@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream {
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream {
}
}
- return (int)bytesRead;
+ return (int) bytesRead;
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 46de0c443..d62d74fb1 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -56,12 +56,12 @@ public class SeaweedOutputStream extends OutputStream {
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
- 10L,
+ 120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream {
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;
@@ -180,7 +180,7 @@ public class SeaweedOutputStream extends OutputStream {
bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 85490c181..2341d335d 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
setConf(conf);
this.uri = uri;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index c26ad728f..6b3c72f7d 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
- final int bufferSize) {
+ final int bufferSize) throws IOException {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
@@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream {
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream {
}
}
- return (int)bytesRead;
+ return (int) bytesRead;
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index c602a0d81..05805b9e5 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -60,12 +60,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+ this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
- 10L,
+ 120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
@@ -113,7 +113,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;
@@ -227,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
bufferToWrite.flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {