aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-17 19:49:16 +0800
committerGitHub <noreply@github.com>2020-07-17 19:49:16 +0800
commit64df5207db14ccf7e7915561b5c9b8f3dab53c6e (patch)
treedf4a05999d67abb7c4765d39eddc01318521169b /other/java
parent8c318470dd95b3fc24d39dc3cf253cc17b03ab39 (diff)
parentf43146b237bc5bbfb7033f6e427b5299554c0824 (diff)
downloadseaweedfs-64df5207db14ccf7e7915561b5c9b8f3dab53c6e.tar.xz
seaweedfs-64df5207db14ccf7e7915561b5c9b8f3dab53c6e.zip
Merge pull request #2 from chrislusf/master
merge
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml2
-rw-r--r--other/java/client/pom.xml.deploy170
-rw-r--r--other/java/client/pom_debug.xml2
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java22
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/ChunkCache.java11
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java5
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java4
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java27
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java20
-rw-r--r--other/java/client/src/main/proto/filer.proto5
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs2/pom.xml2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java137
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java394
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java70
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java (renamed from other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java)22
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java28
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java12
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java183
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java89
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs3/pom.xml2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java137
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java394
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java70
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java (renamed from other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java)22
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java26
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java12
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java183
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java98
31 files changed, 484 insertions, 1710 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 05061e0f6..62134715f 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.9</version>
+ <version>1.3.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
new file mode 100644
index 000000000..62134715f
--- /dev/null
+++ b/other/java/client/pom.xml.deploy
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.github.chrislusf</groupId>
+ <artifactId>seaweedfs-client</artifactId>
+ <version>1.3.6</version>
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>9</version>
+ </parent>
+
+ <properties>
+ <protobuf.version>3.9.1</protobuf.version>
+ <!-- follow https://github.com/grpc/grpc-java -->
+ <grpc.version>1.23.0</grpc.version>
+ <guava.version>28.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.moandjiezana.toml</groupId>
+ <artifactId>toml4j</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <extensions>true</extensions>
+ <configuration>
+ <serverId>ossrh</serverId>
+ <nexusUrl>https://oss.sonatype.org/</nexusUrl>
+ <autoReleaseAfterClose>true</autoReleaseAfterClose>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-javadocs</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 1d8454bf7..dcedc2aa6 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.2.9</version>
+ <version>1.3.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
new file mode 100644
index 000000000..897fe9694
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -0,0 +1,22 @@
+package seaweedfs.client;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ByteBufferPool {
+
+ static List<ByteBuffer> bufferList = new ArrayList<>();
+
+ public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferList.isEmpty()) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+ return bufferList.remove(bufferList.size()-1);
+ }
+
+ public static synchronized void release(ByteBuffer obj) {
+ bufferList.add(obj);
+ }
+
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
index e249d4524..58870d742 100644
--- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
+++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
@@ -7,9 +7,12 @@ import java.util.concurrent.TimeUnit;
public class ChunkCache {
- private final Cache<String, byte[]> cache;
+ private Cache<String, byte[]> cache = null;
public ChunkCache(int maxEntries) {
+ if (maxEntries == 0) {
+ return;
+ }
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
.expireAfterAccess(1, TimeUnit.HOURS)
@@ -17,10 +20,16 @@ public class ChunkCache {
}
public byte[] getChunk(String fileId) {
+ if (this.cache == null) {
+ return null;
+ }
return this.cache.getIfPresent(fileId);
}
public void setChunk(String fileId, byte[] data) {
+ if (this.cache == null) {
+ return;
+ }
this.cache.put(fileId, data);
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index ef32c7e9a..2103fc699 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -156,7 +156,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) {
- List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024);
+ List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) {
break;
}
@@ -173,11 +173,12 @@ public class FilerClient {
return results;
}
- public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
+ public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
+ .setInclusiveStartFrom(includeLastEntry)
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 3f5d1e8e9..57b67f6b0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -39,8 +39,10 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
+ ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()
+ .maxInboundMessageSize(1024 * 1024 * 1024) :
NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 7be39da53..301919919 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -4,6 +4,7 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
@@ -18,7 +19,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
- static ChunkCache chunkCache = new ChunkCache(1000);
+ static ChunkCache chunkCache = new ChunkCache(16);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@@ -78,7 +79,6 @@ public class SeaweedRead {
private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
- HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@@ -86,20 +86,21 @@ public class SeaweedRead {
byte[] data = null;
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
+
try {
- HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
data = EntityUtils.toByteArray(entity);
+ EntityUtils.consume(entity);
+
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ request.releaseConnection();
}
- if (chunkView.isGzipped) {
+ if (chunkView.isCompressed) {
data = Gzip.decompress(data);
}
@@ -129,7 +130,7 @@ public class SeaweedRead {
offset,
isFullChunk,
chunk.cipherKey,
- chunk.isGzipped
+ chunk.isCompressed
));
offset = Math.min(chunk.stop, stop);
}
@@ -165,7 +166,7 @@ public class SeaweedRead {
chunk.getMtime(),
true,
chunk.getCipherKey().toByteArray(),
- chunk.getIsGzipped()
+ chunk.getIsCompressed()
);
// easy cases to speed up
@@ -187,7 +188,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
@@ -199,7 +200,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
- v.isGzipped
+ v.isCompressed
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@@ -247,16 +248,16 @@ public class SeaweedRead {
public final String fileId;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -268,7 +269,7 @@ public class SeaweedRead {
", fileId='" + fileId + '\'' +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
@@ -280,16 +281,16 @@ public class SeaweedRead {
public final long logicOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
- public final boolean isGzipped;
+ public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
- this.isGzipped = isGzipped;
+ this.isCompressed = isCompressed;
}
@Override
@@ -301,7 +302,7 @@ public class SeaweedRead {
", logicOffset=" + logicOffset +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
- ", isGzipped=" + isGzipped +
+ ", isCompressed=" + isCompressed +
'}';
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
new file mode 100644
index 000000000..e2835b718
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
@@ -0,0 +1,27 @@
+package seaweedfs.client;
+
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+public class SeaweedUtil {
+
+ static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+
+ static {
+ // Increase max total connection to 200
+ cm.setMaxTotal(200);
+ // Increase default max connection per route to 20
+ cm.setDefaultMaxPerRoute(20);
+ }
+
+ public static CloseableHttpClient getClosableHttpClient() {
+ return HttpClientBuilder.create()
+ .setConnectionManager(cm)
+ .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
+ .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
+ .build();
+ }
+}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 18ec77b76..e9819668c 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -3,10 +3,11 @@ package seaweedfs.client;
import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
@@ -16,7 +17,7 @@ import java.security.SecureRandom;
public class SeaweedWrite {
- private static SecureRandom random = new SecureRandom();
+ private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
@@ -63,7 +64,7 @@ public class SeaweedWrite {
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory, final FilerProto.Entry.Builder entry) {
- synchronized (entry){
+ synchronized (entry) {
filerGrpcClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
@@ -79,8 +80,6 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
- HttpClient client = new DefaultHttpClient();
-
InputStream inputStream = null;
if (cipherKey == null || cipherKey.length == 0) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
@@ -103,8 +102,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream)
.build());
+ CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
+
try {
- HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
@@ -112,12 +112,12 @@ public class SeaweedWrite {
etag = etag.substring(1, etag.length() - 1);
}
+ EntityUtils.consume(response.getEntity());
+
return etag;
} finally {
- if (client instanceof Closeable) {
- Closeable t = (Closeable) client;
- t.close();
- }
+ response.close();
+ post.releaseConnection();
}
}
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 37121f29c..dcc18f2a5 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -115,6 +115,11 @@ message FileChunk {
FileId source_fid = 8;
bytes cipher_key = 9;
bool is_compressed = 10;
+ bool is_chunk_manifest = 11; // content is a list of FileChunks
+}
+
+message FileChunkManifest {
+ repeated FileChunk chunks = 1;
}
message FileId {
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index 53fb62186..218021a58 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index 0d5b138d5..94f80a114 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java
deleted file mode 100644
index 926d0b83b..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBuffer {
-
- private SeaweedInputStream stream;
- private long offset; // offset within the file for the buffer
- private int length; // actual length, set after the buffer is filles
- private int requestedLength; // requested length of the read
- private byte[] buffer; // the buffer itself
- private int bufferindex = -1; // index in the buffers array in Buffer manager
- private ReadBufferStatus status; // status of the buffer
- private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
- // waiting on this buffer gets unblocked
-
- // fields to help with eviction logic
- private long timeStamp = 0; // tick at which buffer became available to read
- private boolean isFirstByteConsumed = false;
- private boolean isLastByteConsumed = false;
- private boolean isAnyByteConsumed = false;
-
- public SeaweedInputStream getStream() {
- return stream;
- }
-
- public void setStream(SeaweedInputStream stream) {
- this.stream = stream;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public int getLength() {
- return length;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public int getRequestedLength() {
- return requestedLength;
- }
-
- public void setRequestedLength(int requestedLength) {
- this.requestedLength = requestedLength;
- }
-
- public byte[] getBuffer() {
- return buffer;
- }
-
- public void setBuffer(byte[] buffer) {
- this.buffer = buffer;
- }
-
- public int getBufferindex() {
- return bufferindex;
- }
-
- public void setBufferindex(int bufferindex) {
- this.bufferindex = bufferindex;
- }
-
- public ReadBufferStatus getStatus() {
- return status;
- }
-
- public void setStatus(ReadBufferStatus status) {
- this.status = status;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- public void setTimeStamp(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- public boolean isFirstByteConsumed() {
- return isFirstByteConsumed;
- }
-
- public void setFirstByteConsumed(boolean isFirstByteConsumed) {
- this.isFirstByteConsumed = isFirstByteConsumed;
- }
-
- public boolean isLastByteConsumed() {
- return isLastByteConsumed;
- }
-
- public void setLastByteConsumed(boolean isLastByteConsumed) {
- this.isLastByteConsumed = isLastByteConsumed;
- }
-
- public boolean isAnyByteConsumed() {
- return isAnyByteConsumed;
- }
-
- public void setAnyByteConsumed(boolean isAnyByteConsumed) {
- this.isAnyByteConsumed = isAnyByteConsumed;
- }
-
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java
deleted file mode 100644
index 5b1e21529..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package seaweed.hdfs;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * The Read Buffer Manager for Rest AbfsClient.
- */
-final class ReadBufferManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
-
- private static final int NUM_BUFFERS = 16;
- private static final int BLOCK_SIZE = 4 * 1024 * 1024;
- private static final int NUM_THREADS = 8;
- private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
-
- private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
-
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
- private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
-
- static {
- BUFFER_MANAGER = new ReadBufferManager();
- BUFFER_MANAGER.init();
- }
-
- static ReadBufferManager getBufferManager() {
- return BUFFER_MANAGER;
- }
-
- private void init() {
- buffers = new byte[NUM_BUFFERS][];
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
- freeList.add(i);
- }
- for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
- t.setDaemon(true);
- threads[i] = t;
- t.setName("SeaweedFS-prefetch-" + i);
- t.start();
- }
- ReadBufferWorker.UNLEASH_WORKERS.countDown();
- }
-
- // hide instance constructor
- private ReadBufferManager() {
- }
-
-
- /*
- *
- * SeaweedInputStream-facing methods
- *
- */
-
-
- /**
- * {@link SeaweedInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link SeaweedInputStream} for which to do the read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
- */
- void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
- stream.getPath(), requestedOffset, requestedLength);
- }
- ReadBuffer buffer;
- synchronized (this) {
- if (isAlreadyQueued(stream, requestedOffset)) {
- return; // already queued, do not queue again
- }
- if (freeList.isEmpty() && !tryEvict()) {
- return; // no buffers available, cannot queue anything
- }
-
- buffer = new ReadBuffer();
- buffer.setStream(stream);
- buffer.setOffset(requestedOffset);
- buffer.setLength(0);
- buffer.setRequestedLength(requestedLength);
- buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
- buffer.setLatch(new CountDownLatch(1));
-
- Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
-
- buffer.setBuffer(buffers[bufferIndex]);
- buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
- notifyAll();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
- stream.getPath(), requestedOffset, buffer.getBufferindex());
- }
- }
-
-
- /**
- * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
- * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
- * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
- * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
- *
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
- * @return the number of bytes read
- */
- int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
- // not synchronized, so have to be careful with locking
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file {} position {} thread {}",
- stream.getPath(), position, Thread.currentThread().getName());
- }
-
- waitForProcess(stream, position);
-
- int bytesRead = 0;
- synchronized (this) {
- bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
- }
- if (bytesRead > 0) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for {} position {} length {}",
- stream.getPath(), position, bytesRead);
- }
- return bytesRead;
- }
-
- // otherwise, just say we got nothing - calling thread can do its own read
- return 0;
- }
-
- /*
- *
- * Internal methods
- *
- */
-
- private void waitForProcess(final SeaweedInputStream stream, final long position) {
- ReadBuffer readBuf;
- synchronized (this) {
- clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
- }
- if (readBuf != null) { // if in in-progress queue, then block for it
- try {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
- stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
- }
- readBuf.getLatch().await(); // blocking wait on the caller stream's thread
- // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
- // is done processing it (in doneReading). There, the latch is set after removing the buffer from
- // inProgressList. So this latch is safe to be outside the synchronized block.
- // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
- // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
- // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file {} buffer idx {} length {}",
- stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
- }
- }
- }
-
- /**
- * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
- * The objective is to find just one buffer - there is no advantage to evicting more than one.
- *
- * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
- */
- private synchronized boolean tryEvict() {
- ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
- return false; // there are no evict-able buffers
- }
-
- // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isAnyByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
-
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try any old nodes that have not been consumed
- long earliestBirthday = Long.MAX_VALUE;
- for (ReadBuffer buf : completedReadList) {
- if (buf.getTimeStamp() < earliestBirthday) {
- nodeToEvict = buf;
- earliestBirthday = buf.getTimeStamp();
- }
- }
- if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
- return evict(nodeToEvict);
- }
-
- // nothing can be evicted
- return false;
- }
-
- private boolean evict(final ReadBuffer buf) {
- freeList.push(buf.getBufferindex());
- completedReadList.remove(buf);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
- buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
- }
- return true;
- }
-
- private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
- // returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
- }
-
- private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- return (getFromList(list, stream, requestedOffset) != null);
- }
-
- private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- for (ReadBuffer buffer : list) {
- if (buffer.getStream() == stream) {
- if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
- && requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getLength()) {
- return buffer;
- } else if (requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
- return buffer;
- }
- }
- }
- return null;
- }
-
- private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
- if (buffer != null) {
- readAheadQueue.remove(buffer);
- notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
- }
- }
-
- private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
- final byte[] buffer) {
- ReadBuffer buf = getFromList(completedReadList, stream, position);
- if (buf == null || position >= buf.getOffset() + buf.getLength()) {
- return 0;
- }
- int cursor = (int) (position - buf.getOffset());
- int availableLengthInBuffer = buf.getLength() - cursor;
- int lengthToCopy = Math.min(length, availableLengthInBuffer);
- System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
- if (cursor == 0) {
- buf.setFirstByteConsumed(true);
- }
- if (cursor + lengthToCopy == buf.getLength()) {
- buf.setLastByteConsumed(true);
- }
- buf.setAnyByteConsumed(true);
- return lengthToCopy;
- }
-
- /*
- *
- * ReadBufferWorker-thread-facing methods
- *
- */
-
- /**
- * ReadBufferWorker thread calls this to get the next buffer that it should work on.
- *
- * @return {@link ReadBuffer}
- * @throws InterruptedException if thread is interrupted
- */
- ReadBuffer getNextBlockToRead() throws InterruptedException {
- ReadBuffer buffer = null;
- synchronized (this) {
- //buffer = readAheadQueue.take(); // blocking method
- while (readAheadQueue.size() == 0) {
- wait();
- }
- buffer = readAheadQueue.remove();
- notifyAll();
- if (buffer == null) {
- return null; // should never happen
- }
- buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
- inProgressList.add(buffer);
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
- buffer.getStream().getPath(), buffer.getOffset());
- }
- return buffer;
- }
-
- /**
- * ReadBufferWorker thread calls this method to post completion.
- *
- * @param buffer the buffer whose read was completed
- * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
- * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
- */
- void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
- }
- synchronized (this) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setTimeStamp(currentTimeMillis());
- buffer.setLength(bytesActuallyRead);
- completedReadList.add(buffer);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
- }
- }
- //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
- buffer.getLatch().countDown(); // wake up waiting threads (if any)
- }
-
- /**
- * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
- * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
- * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
- * Note: it is not monotonic across Sockets, and even within a CPU, its only the
- * more recent parts which share a clock across all cores.
- *
- * @return current time in milliseconds
- */
- private long currentTimeMillis() {
- return System.nanoTime() / 1000 / 1000;
- }
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java
deleted file mode 100644
index 6ffbc4644..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBufferWorker implements Runnable {
-
- protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
- private int id;
-
- ReadBufferWorker(final int id) {
- this.id = id;
- }
-
- /**
- * return the ID of ReadBufferWorker.
- */
- public int getId() {
- return this.id;
- }
-
- /**
- * Waits until a buffer becomes available in ReadAheadQueue.
- * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
- * Rinse and repeat. Forever.
- */
- public void run() {
- try {
- UNLEASH_WORKERS.await();
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
- ReadBuffer buffer;
- while (true) {
- try {
- buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return;
- }
- if (buffer != null) {
- try {
- // do the actual read, from the file.
- int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
- bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
- } catch (Exception ex) {
- bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
- }
- }
- }
- }
-}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
index d63674977..e021401aa 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
@@ -18,12 +18,18 @@
package seaweed.hdfs;
-/**
- * The ReadBufferStatus for Rest AbfsClient
- */
-public enum ReadBufferStatus {
- NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
- READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
- AVAILABLE, // data is available in buffer. It should be in completedList
- READ_FAILED // read completed, but failed.
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class SeaweedAbstractFileSystem extends DelegateToFileSystem {
+
+ SeaweedAbstractFileSystem(final URI uri, final Configuration conf)
+ throws IOException, URISyntaxException {
+ super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false);
+ }
+
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index d471d8440..85490c181 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -10,6 +10,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -22,7 +23,7 @@ import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
+public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
@@ -144,7 +145,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
- public boolean rename(Path src, Path dst) {
+ public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("rename path: {} => {}", src, dst);
@@ -155,12 +156,13 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
if (src.equals(dst)) {
return true;
}
- FileStatus dstFileStatus = getFileStatus(dst);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst);
- String sourceFileName = src.getName();
Path adjustedDst = dst;
- if (dstFileStatus != null) {
+ if (entry != null) {
+ FileStatus dstFileStatus = getFileStatus(dst);
+ String sourceFileName = src.getName();
if (!dstFileStatus.isDirectory()) {
return false;
}
@@ -175,18 +177,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
- public boolean delete(Path path, boolean recursive) {
+ public boolean delete(Path path, boolean recursive) throws IOException {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
return true;
}
+ FileStatus fileStatus = getFileStatus(path);
+
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
}
@@ -222,9 +226,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
@@ -233,6 +237,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
+ FileStatus fileStatus = getFileStatus(path);
+
if (fileStatus.isDirectory()) {
return true;
} else {
@@ -241,7 +247,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
- public FileStatus getFileStatus(Path path) {
+ public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("getFileStatus path: {}", path);
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 9617a38be..d9c6d6f0d 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -61,7 +61,7 @@ public class SeaweedFileSystemStore {
);
}
- public FileStatus[] listEntries(final Path path) {
+ public FileStatus[] listEntries(final Path path) throws IOException {
LOG.debug("listEntries path: {}", path);
FileStatus pathStatus = getFileStatus(path);
@@ -89,11 +89,11 @@ public class SeaweedFileSystemStore {
}
- public FileStatus getFileStatus(final Path path) {
+ public FileStatus getFileStatus(final Path path) throws IOException {
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
- return null;
+ throw new FileNotFoundException("File does not exist: " + path);
}
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
@@ -136,7 +136,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
- private FilerProto.Entry lookupEntry(Path path) {
+ public FilerProto.Entry lookupEntry(Path path) {
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
@@ -212,7 +212,6 @@ public class SeaweedFileSystemStore {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
- int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
@@ -223,8 +222,7 @@ public class SeaweedFileSystemStore {
statistics,
path.toUri().getPath(),
entry,
- bufferSize,
- readAheadQueueDepth);
+ bufferSize);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 90c14c772..c26ad728f 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream {
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
private final int bufferSize; // default buffer size
- private final int readAheadQueueDepth; // initialized in constructor
- private final boolean readAheadEnabled; // whether enable readAhead;
- private byte[] buffer = null; // will be initialized on first use
+ private long position = 0; // cursor of the file
- private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
- private long fCursorAfterLastRead = -1;
- private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
- private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
- // of valid bytes in buffer)
private boolean closed = false;
public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry,
- final int bufferSize,
- final int readAheadQueueDepth) {
+ final FilerGrpcClient filerGrpcClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry,
+ final int bufferSize) {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
- this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
- this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
@@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
- int currentOff = off;
- int currentLen = len;
- int lastReadBytes;
- int totalReadBytes = 0;
- do {
- lastReadBytes = readOneBlock(b, currentOff, currentLen);
- if (lastReadBytes > 0) {
- currentOff += lastReadBytes;
- currentLen -= lastReadBytes;
- totalReadBytes += lastReadBytes;
- }
- if (currentLen <= 0 || currentLen > b.length - currentOff) {
- break;
- }
- } while (lastReadBytes > 0);
- return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
- }
-
- private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
-
- Preconditions.checkNotNull(b);
-
- if (len == 0) {
- return 0;
- }
-
- if (this.available() == 0) {
- return -1;
- }
-
- if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- //If buffer is empty, then fill the buffer.
- if (bCursor == limit) {
- //If EOF, then return -1
- if (fCursor >= contentLength) {
- return -1;
- }
-
- long bytesRead = 0;
- //reset buffer to initial state - i.e., throw away existing data
- bCursor = 0;
- limit = 0;
- if (buffer == null) {
- buffer = new byte[bufferSize];
- }
-
- // Enable readAhead when reading sequentially
- if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
- bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
- } else {
- bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
- }
-
- if (bytesRead == -1) {
- return -1;
- }
- limit += bytesRead;
- fCursor += bytesRead;
- fCursorAfterLastRead = fCursor;
- }
-
- //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
- //(bytes returned may be less than requested)
- int bytesRemaining = limit - bCursor;
- int bytesToRead = Math.min(len, bytesRemaining);
- System.arraycopy(buffer, bCursor, b, off, bytesToRead);
- bCursor += bytesToRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesToRead);
- }
- return bytesToRead;
- }
-
-
- private int readInternal(final long position, final byte[] b, final int offset, final int length,
- final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
- // try reading from read-ahead
- if (offset != 0) {
- throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
- }
- int receivedBytes;
-
- // queue read-aheads
- int numReadAheads = this.readAheadQueueDepth;
- long nextSize;
- long nextOffset = position;
- while (numReadAheads > 0 && nextOffset < contentLength) {
- nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
- ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
- nextOffset = nextOffset + nextSize;
- numReadAheads--;
- }
-
- // try reading from buffers first
- receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
- if (receivedBytes > 0) {
- return receivedBytes;
- }
-
- // got nothing from read-ahead, do our own read now
- receivedBytes = readRemote(position, b, offset, length);
- return receivedBytes;
- } else {
- return readRemote(position, b, offset, length);
- }
- }
-
- int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
@@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream {
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
- if (offset >= b.length) {
+ if (off >= b.length) {
throw new IllegalArgumentException("offset greater than length of array");
}
- if (length < 0) {
+ if (len < 0) {
throw new IllegalArgumentException("requested read length is less than zero");
}
- if (length > (b.length - offset)) {
+ if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
- long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
+ long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
- return (int) bytesRead;
+
+ if (bytesRead > 0) {
+ this.position += bytesRead;
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return (int)bytesRead;
+
}
/**
@@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
- if (n >= fCursor - limit && n <= fCursor) { // within buffer
- bCursor = (int) (n - (fCursor - limit));
- return;
- }
-
- // next read will read from here
- fCursor = n;
+ this.position = n;
- //invalidate buffer
- limit = 0;
- bCursor = 0;
}
@Override
@@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
- long currentPos = getPos();
- if (currentPos == contentLength) {
+ if (this.position == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
}
- long newPos = currentPos + n;
+ long newPos = this.position + n;
if (newPos < 0) {
newPos = 0;
- n = newPos - currentPos;
+ n = newPos - this.position;
}
if (newPos > contentLength) {
newPos = contentLength;
- n = newPos - currentPos;
+ n = newPos - this.position;
}
seek(newPos);
return n;
@@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream {
public synchronized int available() throws IOException {
if (closed) {
throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
+ FSExceptionMessages.STREAM_IS_CLOSED);
}
final long remaining = this.contentLength - this.getPos();
return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
+ ? (int) remaining : Integer.MAX_VALUE;
}
/**
@@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
- return fCursor - limit + bCursor;
+ return position;
}
/**
@@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized void close() throws IOException {
closed = true;
- buffer = null; // de-reference the buffer so it can be GC'ed sooner
}
/**
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index e08843caa..1138ecca2 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
@@ -14,6 +15,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@@ -28,16 +30,16 @@ public class SeaweedOutputStream extends OutputStream {
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
- private FilerProto.Entry.Builder entry;
+ private final FilerProto.Entry.Builder entry;
+ private final boolean supportFlush = false; // true;
+ private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
- private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
- private ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private ByteBuffer buffer;
+ private long outputIndex;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@@ -50,18 +52,18 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
@@ -84,7 +86,7 @@ public class SeaweedOutputStream extends OutputStream {
@Override
public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
+ throws IOException {
maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
@@ -93,25 +95,29 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException();
}
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
+ int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
+
+ if (numberOfBytesToWrite < writableBytes) {
+ buffer.put(data, currentOffset, numberOfBytesToWrite);
+ outputIndex += numberOfBytesToWrite;
+ break;
}
- writableBytes = bufferSize - bufferIndex;
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ buffer.put(data, currentOffset, writableBytes);
+ outputIndex += writableBytes;
+ currentOffset += writableBytes;
+ writeCurrentBufferToService();
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ writableBytes = bufferSize - buffer.position();
}
+
}
/**
@@ -147,8 +153,9 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ ByteBufferPool.release(buffer);
buffer = null;
- bufferIndex = 0;
+ outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
@@ -158,35 +165,17 @@ public class SeaweedOutputStream extends OutputStream {
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
+ if (buffer.position() == 0) {
return;
}
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
-
- buffer = new byte[bufferSize];
- bufferIndex = 0;
- final long offset = position;
+ buffer.flip();
+ int bytesLength = buffer.limit() - buffer.position();
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
+ // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
position += bytesLength;
+ buffer.clear();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- waitForTaskToComplete();
- }
-
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
- });
-
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index f5d14acdd..00e236aa2 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index 8c88b60df..a03068a48 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java
deleted file mode 100644
index 926d0b83b..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBuffer {
-
- private SeaweedInputStream stream;
- private long offset; // offset within the file for the buffer
- private int length; // actual length, set after the buffer is filles
- private int requestedLength; // requested length of the read
- private byte[] buffer; // the buffer itself
- private int bufferindex = -1; // index in the buffers array in Buffer manager
- private ReadBufferStatus status; // status of the buffer
- private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
- // waiting on this buffer gets unblocked
-
- // fields to help with eviction logic
- private long timeStamp = 0; // tick at which buffer became available to read
- private boolean isFirstByteConsumed = false;
- private boolean isLastByteConsumed = false;
- private boolean isAnyByteConsumed = false;
-
- public SeaweedInputStream getStream() {
- return stream;
- }
-
- public void setStream(SeaweedInputStream stream) {
- this.stream = stream;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public int getLength() {
- return length;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public int getRequestedLength() {
- return requestedLength;
- }
-
- public void setRequestedLength(int requestedLength) {
- this.requestedLength = requestedLength;
- }
-
- public byte[] getBuffer() {
- return buffer;
- }
-
- public void setBuffer(byte[] buffer) {
- this.buffer = buffer;
- }
-
- public int getBufferindex() {
- return bufferindex;
- }
-
- public void setBufferindex(int bufferindex) {
- this.bufferindex = bufferindex;
- }
-
- public ReadBufferStatus getStatus() {
- return status;
- }
-
- public void setStatus(ReadBufferStatus status) {
- this.status = status;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- public void setTimeStamp(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- public boolean isFirstByteConsumed() {
- return isFirstByteConsumed;
- }
-
- public void setFirstByteConsumed(boolean isFirstByteConsumed) {
- this.isFirstByteConsumed = isFirstByteConsumed;
- }
-
- public boolean isLastByteConsumed() {
- return isLastByteConsumed;
- }
-
- public void setLastByteConsumed(boolean isLastByteConsumed) {
- this.isLastByteConsumed = isLastByteConsumed;
- }
-
- public boolean isAnyByteConsumed() {
- return isAnyByteConsumed;
- }
-
- public void setAnyByteConsumed(boolean isAnyByteConsumed) {
- this.isAnyByteConsumed = isAnyByteConsumed;
- }
-
-}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java
deleted file mode 100644
index 5b1e21529..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package seaweed.hdfs;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * The Read Buffer Manager for Rest AbfsClient.
- */
-final class ReadBufferManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
-
- private static final int NUM_BUFFERS = 16;
- private static final int BLOCK_SIZE = 4 * 1024 * 1024;
- private static final int NUM_THREADS = 8;
- private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
-
- private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
-
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
- private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
-
- static {
- BUFFER_MANAGER = new ReadBufferManager();
- BUFFER_MANAGER.init();
- }
-
- static ReadBufferManager getBufferManager() {
- return BUFFER_MANAGER;
- }
-
- private void init() {
- buffers = new byte[NUM_BUFFERS][];
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
- freeList.add(i);
- }
- for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
- t.setDaemon(true);
- threads[i] = t;
- t.setName("SeaweedFS-prefetch-" + i);
- t.start();
- }
- ReadBufferWorker.UNLEASH_WORKERS.countDown();
- }
-
- // hide instance constructor
- private ReadBufferManager() {
- }
-
-
- /*
- *
- * SeaweedInputStream-facing methods
- *
- */
-
-
- /**
- * {@link SeaweedInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link SeaweedInputStream} for which to do the read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
- */
- void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
- stream.getPath(), requestedOffset, requestedLength);
- }
- ReadBuffer buffer;
- synchronized (this) {
- if (isAlreadyQueued(stream, requestedOffset)) {
- return; // already queued, do not queue again
- }
- if (freeList.isEmpty() && !tryEvict()) {
- return; // no buffers available, cannot queue anything
- }
-
- buffer = new ReadBuffer();
- buffer.setStream(stream);
- buffer.setOffset(requestedOffset);
- buffer.setLength(0);
- buffer.setRequestedLength(requestedLength);
- buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
- buffer.setLatch(new CountDownLatch(1));
-
- Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
-
- buffer.setBuffer(buffers[bufferIndex]);
- buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
- notifyAll();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
- stream.getPath(), requestedOffset, buffer.getBufferindex());
- }
- }
-
-
- /**
- * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
- * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
- * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
- * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
- *
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
- * @return the number of bytes read
- */
- int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
- // not synchronized, so have to be careful with locking
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file {} position {} thread {}",
- stream.getPath(), position, Thread.currentThread().getName());
- }
-
- waitForProcess(stream, position);
-
- int bytesRead = 0;
- synchronized (this) {
- bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
- }
- if (bytesRead > 0) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for {} position {} length {}",
- stream.getPath(), position, bytesRead);
- }
- return bytesRead;
- }
-
- // otherwise, just say we got nothing - calling thread can do its own read
- return 0;
- }
-
- /*
- *
- * Internal methods
- *
- */
-
- private void waitForProcess(final SeaweedInputStream stream, final long position) {
- ReadBuffer readBuf;
- synchronized (this) {
- clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
- }
- if (readBuf != null) { // if in in-progress queue, then block for it
- try {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
- stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
- }
- readBuf.getLatch().await(); // blocking wait on the caller stream's thread
- // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
- // is done processing it (in doneReading). There, the latch is set after removing the buffer from
- // inProgressList. So this latch is safe to be outside the synchronized block.
- // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
- // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
- // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file {} buffer idx {} length {}",
- stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
- }
- }
- }
-
- /**
- * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
- * The objective is to find just one buffer - there is no advantage to evicting more than one.
- *
- * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
- */
- private synchronized boolean tryEvict() {
- ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
- return false; // there are no evict-able buffers
- }
-
- // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isAnyByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
-
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try any old nodes that have not been consumed
- long earliestBirthday = Long.MAX_VALUE;
- for (ReadBuffer buf : completedReadList) {
- if (buf.getTimeStamp() < earliestBirthday) {
- nodeToEvict = buf;
- earliestBirthday = buf.getTimeStamp();
- }
- }
- if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
- return evict(nodeToEvict);
- }
-
- // nothing can be evicted
- return false;
- }
-
- private boolean evict(final ReadBuffer buf) {
- freeList.push(buf.getBufferindex());
- completedReadList.remove(buf);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
- buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
- }
- return true;
- }
-
- private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
- // returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
- }
-
- private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- return (getFromList(list, stream, requestedOffset) != null);
- }
-
- private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- for (ReadBuffer buffer : list) {
- if (buffer.getStream() == stream) {
- if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
- && requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getLength()) {
- return buffer;
- } else if (requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
- return buffer;
- }
- }
- }
- return null;
- }
-
- private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
- if (buffer != null) {
- readAheadQueue.remove(buffer);
- notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
- }
- }
-
- private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
- final byte[] buffer) {
- ReadBuffer buf = getFromList(completedReadList, stream, position);
- if (buf == null || position >= buf.getOffset() + buf.getLength()) {
- return 0;
- }
- int cursor = (int) (position - buf.getOffset());
- int availableLengthInBuffer = buf.getLength() - cursor;
- int lengthToCopy = Math.min(length, availableLengthInBuffer);
- System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
- if (cursor == 0) {
- buf.setFirstByteConsumed(true);
- }
- if (cursor + lengthToCopy == buf.getLength()) {
- buf.setLastByteConsumed(true);
- }
- buf.setAnyByteConsumed(true);
- return lengthToCopy;
- }
-
- /*
- *
- * ReadBufferWorker-thread-facing methods
- *
- */
-
- /**
- * ReadBufferWorker thread calls this to get the next buffer that it should work on.
- *
- * @return {@link ReadBuffer}
- * @throws InterruptedException if thread is interrupted
- */
- ReadBuffer getNextBlockToRead() throws InterruptedException {
- ReadBuffer buffer = null;
- synchronized (this) {
- //buffer = readAheadQueue.take(); // blocking method
- while (readAheadQueue.size() == 0) {
- wait();
- }
- buffer = readAheadQueue.remove();
- notifyAll();
- if (buffer == null) {
- return null; // should never happen
- }
- buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
- inProgressList.add(buffer);
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
- buffer.getStream().getPath(), buffer.getOffset());
- }
- return buffer;
- }
-
- /**
- * ReadBufferWorker thread calls this method to post completion.
- *
- * @param buffer the buffer whose read was completed
- * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
- * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
- */
- void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
- }
- synchronized (this) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setTimeStamp(currentTimeMillis());
- buffer.setLength(bytesActuallyRead);
- completedReadList.add(buffer);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
- }
- }
- //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
- buffer.getLatch().countDown(); // wake up waiting threads (if any)
- }
-
- /**
- * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
- * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
- * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
- * Note: it is not monotonic across Sockets, and even within a CPU, its only the
- * more recent parts which share a clock across all cores.
- *
- * @return current time in milliseconds
- */
- private long currentTimeMillis() {
- return System.nanoTime() / 1000 / 1000;
- }
-}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java
deleted file mode 100644
index 6ffbc4644..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBufferWorker implements Runnable {
-
- protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
- private int id;
-
- ReadBufferWorker(final int id) {
- this.id = id;
- }
-
- /**
- * return the ID of ReadBufferWorker.
- */
- public int getId() {
- return this.id;
- }
-
- /**
- * Waits until a buffer becomes available in ReadAheadQueue.
- * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
- * Rinse and repeat. Forever.
- */
- public void run() {
- try {
- UNLEASH_WORKERS.await();
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
- ReadBuffer buffer;
- while (true) {
- try {
- buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return;
- }
- if (buffer != null) {
- try {
- // do the actual read, from the file.
- int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
- bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
- } catch (Exception ex) {
- bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
- }
- }
- }
- }
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
index d63674977..e021401aa 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
@@ -18,12 +18,18 @@
package seaweed.hdfs;
-/**
- * The ReadBufferStatus for Rest AbfsClient
- */
-public enum ReadBufferStatus {
- NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
- READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
- AVAILABLE, // data is available in buffer. It should be in completedList
- READ_FAILED // read completed, but failed.
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class SeaweedAbstractFileSystem extends DelegateToFileSystem {
+
+ SeaweedAbstractFileSystem(final URI uri, final Configuration conf)
+ throws IOException, URISyntaxException {
+ super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false);
+ }
+
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index c12da8261..85490c181 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -10,6 +10,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -144,7 +145,7 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
- public boolean rename(Path src, Path dst) {
+ public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("rename path: {} => {}", src, dst);
@@ -155,12 +156,13 @@ public class SeaweedFileSystem extends FileSystem {
if (src.equals(dst)) {
return true;
}
- FileStatus dstFileStatus = getFileStatus(dst);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst);
- String sourceFileName = src.getName();
Path adjustedDst = dst;
- if (dstFileStatus != null) {
+ if (entry != null) {
+ FileStatus dstFileStatus = getFileStatus(dst);
+ String sourceFileName = src.getName();
if (!dstFileStatus.isDirectory()) {
return false;
}
@@ -175,18 +177,20 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
- public boolean delete(Path path, boolean recursive) {
+ public boolean delete(Path path, boolean recursive) throws IOException {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
return true;
}
+ FileStatus fileStatus = getFileStatus(path);
+
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
}
@@ -222,9 +226,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
- FileStatus fileStatus = getFileStatus(path);
+ FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
- if (fileStatus == null) {
+ if (entry == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
@@ -233,6 +237,8 @@ public class SeaweedFileSystem extends FileSystem {
}
+ FileStatus fileStatus = getFileStatus(path);
+
if (fileStatus.isDirectory()) {
return true;
} else {
@@ -241,7 +247,7 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
- public FileStatus getFileStatus(Path path) {
+ public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("getFileStatus path: {}", path);
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 9617a38be..d9c6d6f0d 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -61,7 +61,7 @@ public class SeaweedFileSystemStore {
);
}
- public FileStatus[] listEntries(final Path path) {
+ public FileStatus[] listEntries(final Path path) throws IOException {
LOG.debug("listEntries path: {}", path);
FileStatus pathStatus = getFileStatus(path);
@@ -89,11 +89,11 @@ public class SeaweedFileSystemStore {
}
- public FileStatus getFileStatus(final Path path) {
+ public FileStatus getFileStatus(final Path path) throws IOException {
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
- return null;
+ throw new FileNotFoundException("File does not exist: " + path);
}
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
@@ -136,7 +136,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
- private FilerProto.Entry lookupEntry(Path path) {
+ public FilerProto.Entry lookupEntry(Path path) {
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
@@ -212,7 +212,6 @@ public class SeaweedFileSystemStore {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
- int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
@@ -223,8 +222,7 @@ public class SeaweedFileSystemStore {
statistics,
path.toUri().getPath(),
entry,
- bufferSize,
- readAheadQueueDepth);
+ bufferSize);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 90c14c772..c26ad728f 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream {
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
private final int bufferSize; // default buffer size
- private final int readAheadQueueDepth; // initialized in constructor
- private final boolean readAheadEnabled; // whether enable readAhead;
- private byte[] buffer = null; // will be initialized on first use
+ private long position = 0; // cursor of the file
- private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
- private long fCursorAfterLastRead = -1;
- private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
- private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
- // of valid bytes in buffer)
private boolean closed = false;
public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry,
- final int bufferSize,
- final int readAheadQueueDepth) {
+ final FilerGrpcClient filerGrpcClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry,
+ final int bufferSize) {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
- this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
- this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
@@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
- int currentOff = off;
- int currentLen = len;
- int lastReadBytes;
- int totalReadBytes = 0;
- do {
- lastReadBytes = readOneBlock(b, currentOff, currentLen);
- if (lastReadBytes > 0) {
- currentOff += lastReadBytes;
- currentLen -= lastReadBytes;
- totalReadBytes += lastReadBytes;
- }
- if (currentLen <= 0 || currentLen > b.length - currentOff) {
- break;
- }
- } while (lastReadBytes > 0);
- return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
- }
-
- private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
-
- Preconditions.checkNotNull(b);
-
- if (len == 0) {
- return 0;
- }
-
- if (this.available() == 0) {
- return -1;
- }
-
- if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- //If buffer is empty, then fill the buffer.
- if (bCursor == limit) {
- //If EOF, then return -1
- if (fCursor >= contentLength) {
- return -1;
- }
-
- long bytesRead = 0;
- //reset buffer to initial state - i.e., throw away existing data
- bCursor = 0;
- limit = 0;
- if (buffer == null) {
- buffer = new byte[bufferSize];
- }
-
- // Enable readAhead when reading sequentially
- if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
- bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
- } else {
- bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
- }
-
- if (bytesRead == -1) {
- return -1;
- }
- limit += bytesRead;
- fCursor += bytesRead;
- fCursorAfterLastRead = fCursor;
- }
-
- //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
- //(bytes returned may be less than requested)
- int bytesRemaining = limit - bCursor;
- int bytesToRead = Math.min(len, bytesRemaining);
- System.arraycopy(buffer, bCursor, b, off, bytesToRead);
- bCursor += bytesToRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesToRead);
- }
- return bytesToRead;
- }
-
-
- private int readInternal(final long position, final byte[] b, final int offset, final int length,
- final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
- // try reading from read-ahead
- if (offset != 0) {
- throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
- }
- int receivedBytes;
-
- // queue read-aheads
- int numReadAheads = this.readAheadQueueDepth;
- long nextSize;
- long nextOffset = position;
- while (numReadAheads > 0 && nextOffset < contentLength) {
- nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
- ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
- nextOffset = nextOffset + nextSize;
- numReadAheads--;
- }
-
- // try reading from buffers first
- receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
- if (receivedBytes > 0) {
- return receivedBytes;
- }
-
- // got nothing from read-ahead, do our own read now
- receivedBytes = readRemote(position, b, offset, length);
- return receivedBytes;
- } else {
- return readRemote(position, b, offset, length);
- }
- }
-
- int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
@@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream {
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
- if (offset >= b.length) {
+ if (off >= b.length) {
throw new IllegalArgumentException("offset greater than length of array");
}
- if (length < 0) {
+ if (len < 0) {
throw new IllegalArgumentException("requested read length is less than zero");
}
- if (length > (b.length - offset)) {
+ if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
- long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
+ long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
- return (int) bytesRead;
+
+ if (bytesRead > 0) {
+ this.position += bytesRead;
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return (int)bytesRead;
+
}
/**
@@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
- if (n >= fCursor - limit && n <= fCursor) { // within buffer
- bCursor = (int) (n - (fCursor - limit));
- return;
- }
-
- // next read will read from here
- fCursor = n;
+ this.position = n;
- //invalidate buffer
- limit = 0;
- bCursor = 0;
}
@Override
@@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
- long currentPos = getPos();
- if (currentPos == contentLength) {
+ if (this.position == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
}
- long newPos = currentPos + n;
+ long newPos = this.position + n;
if (newPos < 0) {
newPos = 0;
- n = newPos - currentPos;
+ n = newPos - this.position;
}
if (newPos > contentLength) {
newPos = contentLength;
- n = newPos - currentPos;
+ n = newPos - this.position;
}
seek(newPos);
return n;
@@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream {
public synchronized int available() throws IOException {
if (closed) {
throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
+ FSExceptionMessages.STREAM_IS_CLOSED);
}
final long remaining = this.contentLength - this.getPos();
return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
+ ? (int) remaining : Integer.MAX_VALUE;
}
/**
@@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
- return fCursor - limit + bCursor;
+ return position;
}
/**
@@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized void close() throws IOException {
closed = true;
- buffer = null; // de-reference the buffer so it can be GC'ed sooner
}
/**
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 96af27fe0..9ea26776b 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -9,6 +9,7 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
@@ -16,14 +17,10 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Locale;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@@ -37,16 +34,16 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
- private FilerProto.Entry.Builder entry;
+ private final FilerProto.Entry.Builder entry;
+ private final boolean supportFlush = false; // true;
+ private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
- private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
- private ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private ByteBuffer buffer;
+ private long outputIndex;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@@ -59,18 +56,18 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
@@ -93,7 +90,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
@Override
public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
+ throws IOException {
maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
@@ -102,25 +99,29 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
throw new IndexOutOfBoundsException();
}
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
+ int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
+
+ if (numberOfBytesToWrite < writableBytes) {
+ buffer.put(data, currentOffset, numberOfBytesToWrite);
+ outputIndex += numberOfBytesToWrite;
+ break;
}
- writableBytes = bufferSize - bufferIndex;
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ buffer.put(data, currentOffset, writableBytes);
+ outputIndex += writableBytes;
+ currentOffset += writableBytes;
+ writeCurrentBufferToService();
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ writableBytes = bufferSize - buffer.position();
}
+
}
/**
@@ -199,8 +200,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ ByteBufferPool.release(buffer);
buffer = null;
- bufferIndex = 0;
+ outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
@@ -210,35 +212,17 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
+ if (buffer.position() == 0) {
return;
}
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
-
- buffer = new byte[bufferSize];
- bufferIndex = 0;
- final long offset = position;
+ buffer.flip();
+ int bytesLength = buffer.limit() - buffer.position();
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
+ // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
position += bytesLength;
+ buffer.clear();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- waitForTaskToComplete();
- }
-
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
- });
-
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {