diff options
Diffstat (limited to 'other/java')
31 files changed, 484 insertions, 1710 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 05061e0f6..62134715f 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.2.9</version> + <version>1.3.6</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy new file mode 100644 index 000000000..62134715f --- /dev/null +++ b/other/java/client/pom.xml.deploy @@ -0,0 +1,170 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.github.chrislusf</groupId> + <artifactId>seaweedfs-client</artifactId> + <version>1.3.6</version> + + <parent> + <groupId>org.sonatype.oss</groupId> + <artifactId>oss-parent</artifactId> + <version>9</version> + </parent> + + <properties> + <protobuf.version>3.9.1</protobuf.version> + <!-- follow https://github.com/grpc/grpc-java --> + <grpc.version>1.23.0</grpc.version> + <guava.version>28.0-jre</guava.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.moandjiezana.toml</groupId> + <artifactId>toml4j</artifactId> + <version>0.7.2</version> + </dependency> + <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>4.5.6</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + + <distributionManagement> + <snapshotRepository> + <id>ossrh</id> + <url>https://oss.sonatype.org/content/repositories/snapshots</url> + </snapshotRepository> + </distributionManagement> + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.6.2</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> + <target>8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>sign-artifacts</id> + <phase>verify</phase> + <goals> + <goal>sign</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.sonatype.plugins</groupId> + <artifactId>nexus-staging-maven-plugin</artifactId> + <version>1.6.7</version> + <extensions>true</extensions> + <configuration> + <serverId>ossrh</serverId> + <nexusUrl>https://oss.sonatype.org/</nexusUrl> + <autoReleaseAfterClose>true</autoReleaseAfterClose> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.2.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9.1</version> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 1d8454bf7..dcedc2aa6 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.2.9</version> + <version>1.3.6</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java new file mode 100644 index 000000000..897fe9694 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java @@ -0,0 +1,22 @@ +package seaweedfs.client; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ByteBufferPool { + + static List<ByteBuffer> bufferList = new ArrayList<>(); + + public static synchronized ByteBuffer request(int bufferSize) { + if (bufferList.isEmpty()) { + return ByteBuffer.allocate(bufferSize); + } + return bufferList.remove(bufferList.size()-1); + } + + public static synchronized void release(ByteBuffer obj) { + bufferList.add(obj); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index e249d4524..58870d742 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -7,9 +7,12 @@ import java.util.concurrent.TimeUnit; public class ChunkCache { - private final Cache<String, byte[]> cache; + private Cache<String, byte[]> cache = null; public ChunkCache(int maxEntries) { + if (maxEntries == 0) { + return; + } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) .expireAfterAccess(1, TimeUnit.HOURS) @@ -17,10 +20,16 @@ public class ChunkCache { } public byte[] getChunk(String fileId) { + if (this.cache == null) { + return null; + } return this.cache.getIfPresent(fileId); } public void setChunk(String fileId, byte[] data) { + if (this.cache == null) { + return; + } this.cache.put(fileId, data); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index ef32c7e9a..2103fc699 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -156,7 +156,7 @@ public class FilerClient { List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>(); String lastFileName = ""; for (int limit = Integer.MAX_VALUE; limit > 0; ) { - List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024); + List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false); if (t == null) { break; } @@ -173,11 +173,12 @@ public class FilerClient { return results; } - public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) { + public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) { Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) + .setInclusiveStartFrom(includeLastEntry) .setLimit(limit) .build()); List<FilerProto.Entry> entries = new ArrayList<>(); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 3f5d1e8e9..57b67f6b0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -39,8 +39,10 @@ public class FilerGrpcClient { public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) { this(sslContext == null ? - ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() : + ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() + .maxInboundMessageSize(1024 * 1024 * 1024) : NettyChannelBuilder.forAddress(host, grpcPort) + .maxInboundMessageSize(1024 * 1024 * 1024) .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 7be39da53..301919919 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -4,6 +4,7 @@ import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; @@ -18,7 +19,7 @@ public class SeaweedRead { private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); - static ChunkCache chunkCache = new ChunkCache(1000); + static ChunkCache chunkCache = new ChunkCache(16); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, @@ -78,7 +79,6 @@ public class SeaweedRead { private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { - HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); @@ -86,20 +86,21 @@ public class SeaweedRead { byte[] data = null; + CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request); + try { - HttpResponse response = client.execute(request); HttpEntity entity = response.getEntity(); data = EntityUtils.toByteArray(entity); + EntityUtils.consume(entity); + } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); - } + response.close(); + request.releaseConnection(); } - if (chunkView.isGzipped) { + if (chunkView.isCompressed) { data = Gzip.decompress(data); } @@ -129,7 +130,7 @@ public class SeaweedRead { offset, isFullChunk, chunk.cipherKey, - chunk.isGzipped + chunk.isCompressed )); offset = Math.min(chunk.stop, stop); } @@ -165,7 +166,7 @@ public class SeaweedRead { chunk.getMtime(), true, chunk.getCipherKey().toByteArray(), - chunk.getIsGzipped() + chunk.getIsCompressed() ); // easy cases to speed up @@ -187,7 +188,7 @@ public class SeaweedRead { v.modifiedTime, false, v.cipherKey, - v.isGzipped + v.isCompressed )); } long chunkStop = chunk.getOffset() + chunk.getSize(); @@ -199,7 +200,7 @@ public class SeaweedRead { v.modifiedTime, false, v.cipherKey, - v.isGzipped + v.isCompressed )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -247,16 +248,16 @@ public class SeaweedRead { public final String fileId; public final boolean isFullChunk; public final byte[] cipherKey; - public final boolean isGzipped; + public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; this.isFullChunk = isFullChunk; this.cipherKey = cipherKey; - this.isGzipped = isGzipped; + this.isCompressed = isCompressed; } @Override @@ -268,7 +269,7 @@ public class SeaweedRead { ", fileId='" + fileId + '\'' + ", isFullChunk=" + isFullChunk + ", cipherKey=" + Arrays.toString(cipherKey) + - ", isGzipped=" + isGzipped + + ", isCompressed=" + isCompressed + '}'; } } @@ -280,16 +281,16 @@ public class SeaweedRead { public final long logicOffset; public final boolean isFullChunk; public final byte[] cipherKey; - public final boolean isGzipped; + public final boolean isCompressed; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; this.isFullChunk = isFullChunk; this.cipherKey = cipherKey; - this.isGzipped = isGzipped; + this.isCompressed = isCompressed; } @Override @@ -301,7 +302,7 @@ public class SeaweedRead { ", logicOffset=" + logicOffset + ", isFullChunk=" + isFullChunk + ", cipherKey=" + Arrays.toString(cipherKey) + - ", isGzipped=" + isGzipped + + ", isCompressed=" + isCompressed + '}'; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java new file mode 100644 index 000000000..e2835b718 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java @@ -0,0 +1,27 @@ +package seaweedfs.client; + +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; + +public class SeaweedUtil { + + static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + + static { + // Increase max total connection to 200 + cm.setMaxTotal(200); + // Increase default max connection per route to 20 + cm.setDefaultMaxPerRoute(20); + } + + public static CloseableHttpClient getClosableHttpClient() { + return HttpClientBuilder.create() + .setConnectionManager(cm) + .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE) + .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE) + .build(); + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 18ec77b76..e9819668c 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -3,10 +3,11 @@ package seaweedfs.client; import com.google.protobuf.ByteString; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; -import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; import java.io.ByteArrayInputStream; import java.io.Closeable; @@ -16,7 +17,7 @@ import java.security.SecureRandom; public class SeaweedWrite { - private static SecureRandom random = new SecureRandom(); + private static final SecureRandom random = new SecureRandom(); public static void writeData(FilerProto.Entry.Builder entry, final String replication, @@ -63,7 +64,7 @@ public class SeaweedWrite { public static void writeMeta(final FilerGrpcClient filerGrpcClient, final String parentDirectory, final FilerProto.Entry.Builder entry) { - synchronized (entry){ + synchronized (entry) { filerGrpcClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) @@ -79,8 +80,6 @@ public class SeaweedWrite { final long bytesOffset, final long bytesLength, byte[] cipherKey) throws IOException { - HttpClient client = new DefaultHttpClient(); - InputStream inputStream = null; if (cipherKey == null || cipherKey.length == 0) { inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); @@ -103,8 +102,9 @@ public class SeaweedWrite { .addBinaryBody("upload", inputStream) .build()); + CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post); + try { - HttpResponse response = client.execute(post); String etag = response.getLastHeader("ETag").getValue(); @@ -112,12 +112,12 @@ public class SeaweedWrite { etag = etag.substring(1, etag.length() - 1); } + EntityUtils.consume(response.getEntity()); + return etag; } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); - } + response.close(); + post.releaseConnection(); } } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 37121f29c..dcc18f2a5 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -115,6 +115,11 @@ message FileChunk { FileId source_fid = 8; bytes cipher_key = 9; bool is_compressed = 10; + bool is_chunk_manifest = 11; // content is a list of FileChunks +} + +message FileChunkManifest { + repeated FileChunk chunks = 1; } message FileId { diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 53fb62186..218021a58 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 0d5b138d5..94f80a114 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.2.9</seaweedfs.client.version> + <seaweedfs.client.version>1.3.6</seaweedfs.client.version> <hadoop.version>2.9.2</hadoop.version> </properties> diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java index d63674977..e021401aa 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java @@ -18,12 +18,18 @@ package seaweed.hdfs; -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +public class SeaweedAbstractFileSystem extends DelegateToFileSystem { + + SeaweedAbstractFileSystem(final URI uri, final Configuration conf) + throws IOException, URISyntaxException { + super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false); + } + } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index d471d8440..85490c181 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -10,6 +10,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; @@ -22,7 +23,7 @@ import java.util.Map; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; -public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { +public class SeaweedFileSystem extends FileSystem { public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; @@ -144,7 +145,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public boolean rename(Path src, Path dst) { + public boolean rename(Path src, Path dst) throws IOException { LOG.debug("rename path: {} => {}", src, dst); @@ -155,12 +156,13 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { if (src.equals(dst)) { return true; } - FileStatus dstFileStatus = getFileStatus(dst); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst); - String sourceFileName = src.getName(); Path adjustedDst = dst; - if (dstFileStatus != null) { + if (entry != null) { + FileStatus dstFileStatus = getFileStatus(dst); + String sourceFileName = src.getName(); if (!dstFileStatus.isDirectory()) { return false; } @@ -175,18 +177,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public boolean delete(Path path, boolean recursive) { + public boolean delete(Path path, boolean recursive) throws IOException { LOG.debug("delete path: {} recursive:{}", path, recursive); path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { return true; } + FileStatus fileStatus = getFileStatus(path); + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); } @@ -222,9 +226,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return seaweedFileSystemStore.createDirectory(path, currentUser, @@ -233,6 +237,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } + FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { return true; } else { @@ -241,7 +247,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public FileStatus getFileStatus(Path path) { + public FileStatus getFileStatus(Path path) throws IOException { LOG.debug("getFileStatus path: {}", path); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 9617a38be..d9c6d6f0d 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -61,7 +61,7 @@ public class SeaweedFileSystemStore { ); } - public FileStatus[] listEntries(final Path path) { + public FileStatus[] listEntries(final Path path) throws IOException { LOG.debug("listEntries path: {}", path); FileStatus pathStatus = getFileStatus(path); @@ -89,11 +89,11 @@ public class SeaweedFileSystemStore { } - public FileStatus getFileStatus(final Path path) { + public FileStatus getFileStatus(final Path path) throws IOException { FilerProto.Entry entry = lookupEntry(path); if (entry == null) { - return null; + throw new FileNotFoundException("File does not exist: " + path); } LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); @@ -136,7 +136,7 @@ public class SeaweedFileSystemStore { modification_time, access_time, permission, owner, group, null, path); } - private FilerProto.Entry lookupEntry(Path path) { + public FilerProto.Entry lookupEntry(Path path) { return filerClient.lookupEntry(getParentDirectory(path), path.getName()); @@ -212,7 +212,6 @@ public class SeaweedFileSystemStore { LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); - int readAheadQueueDepth = 2; FilerProto.Entry entry = lookupEntry(path); if (entry == null) { @@ -223,8 +222,7 @@ public class SeaweedFileSystemStore { statistics, path.toUri().getPath(), entry, - bufferSize, - readAheadQueueDepth); + bufferSize); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 90c14c772..c26ad728f 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream { private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final long contentLength; private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - private byte[] buffer = null; // will be initialized on first use + private long position = 0; // cursor of the file - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { + final FilerGrpcClient filerGrpcClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry, + final int bufferSize) { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); @@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream { if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } - if (offset >= b.length) { + if (off >= b.length) { throw new IllegalArgumentException("offset greater than length of array"); } - if (length < 0) { + if (len < 0) { throw new IllegalArgumentException("requested read length is less than zero"); } - if (length > (b.length - offset)) { + if (len > (b.length - off)) { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } - return (int) bytesRead; + + if (bytesRead > 0) { + this.position += bytesRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + + return (int)bytesRead; + } /** @@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } - - // next read will read from here - fCursor = n; + this.position = n; - //invalidate buffer - limit = 0; - bCursor = 0; } @Override @@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - long currentPos = getPos(); - if (currentPos == contentLength) { + if (this.position == contentLength) { if (n > 0) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } } - long newPos = currentPos + n; + long newPos = this.position + n; if (newPos < 0) { newPos = 0; - n = newPos - currentPos; + n = newPos - this.position; } if (newPos > contentLength) { newPos = contentLength; - n = newPos - currentPos; + n = newPos - this.position; } seek(newPos); return n; @@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream { public synchronized int available() throws IOException { if (closed) { throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); + FSExceptionMessages.STREAM_IS_CLOSED); } final long remaining = this.contentLength - this.getPos(); return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining : Integer.MAX_VALUE; } /** @@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return position; } /** @@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized void close() throws IOException { closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner } /** diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index e08843caa..1138ecca2 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.ByteBufferPool; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedWrite; @@ -14,6 +15,7 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -28,16 +30,16 @@ public class SeaweedOutputStream extends OutputStream { private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; + private final FilerProto.Entry.Builder entry; + private final boolean supportFlush = false; // true; + private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private long position; private boolean closed; - private boolean supportFlush = true; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; + private ByteBuffer buffer; + private long outputIndex; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -50,18 +52,18 @@ public class SeaweedOutputStream extends OutputStream { this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; @@ -84,7 +86,7 @@ public class SeaweedOutputStream extends OutputStream { @Override public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { + throws IOException { maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -93,25 +95,29 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; + int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; + + if (numberOfBytesToWrite < writableBytes) { + buffer.put(data, currentOffset, numberOfBytesToWrite); + outputIndex += numberOfBytesToWrite; + break; } - writableBytes = bufferSize - bufferIndex; + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + buffer.put(data, currentOffset, writableBytes); + outputIndex += writableBytes; + currentOffset += writableBytes; + writeCurrentBufferToService(); + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + writableBytes = bufferSize - buffer.position(); } + } /** @@ -147,8 +153,9 @@ public class SeaweedOutputStream extends OutputStream { threadExecutor.shutdown(); } finally { lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + ByteBufferPool.release(buffer); buffer = null; - bufferIndex = 0; + outputIndex = 0; closed = true; writeOperations.clear(); if (!threadExecutor.isShutdown()) { @@ -158,35 +165,17 @@ public class SeaweedOutputStream extends OutputStream { } private synchronized void writeCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { + if (buffer.position() == 0) { return; } - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; - - buffer = new byte[bufferSize]; - bufferIndex = 0; - final long offset = position; + buffer.flip(); + int bytesLength = buffer.limit() - buffer.position(); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit()); + // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")"); position += bytesLength; + buffer.clear(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { - waitForTaskToComplete(); - } - - final Future<Void> job = completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } - }); - - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); } private void waitForTaskToComplete() throws IOException { diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index f5d14acdd..00e236aa2 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.9</seaweedfs.client.version>
+ <seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 8c88b60df..a03068a48 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.2.9</seaweedfs.client.version> + <seaweedfs.client.version>1.3.6</seaweedfs.client.version> <hadoop.version>3.1.1</hadoop.version> </properties> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java index d63674977..e021401aa 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java @@ -18,12 +18,18 @@ package seaweed.hdfs; -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +public class SeaweedAbstractFileSystem extends DelegateToFileSystem { + + SeaweedAbstractFileSystem(final URI uri, final Configuration conf) + throws IOException, URISyntaxException { + super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false); + } + } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index c12da8261..85490c181 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -10,6 +10,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; @@ -144,7 +145,7 @@ public class SeaweedFileSystem extends FileSystem { } @Override - public boolean rename(Path src, Path dst) { + public boolean rename(Path src, Path dst) throws IOException { LOG.debug("rename path: {} => {}", src, dst); @@ -155,12 +156,13 @@ public class SeaweedFileSystem extends FileSystem { if (src.equals(dst)) { return true; } - FileStatus dstFileStatus = getFileStatus(dst); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst); - String sourceFileName = src.getName(); Path adjustedDst = dst; - if (dstFileStatus != null) { + if (entry != null) { + FileStatus dstFileStatus = getFileStatus(dst); + String sourceFileName = src.getName(); if (!dstFileStatus.isDirectory()) { return false; } @@ -175,18 +177,20 @@ public class SeaweedFileSystem extends FileSystem { } @Override - public boolean delete(Path path, boolean recursive) { + public boolean delete(Path path, boolean recursive) throws IOException { LOG.debug("delete path: {} recursive:{}", path, recursive); path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { return true; } + FileStatus fileStatus = getFileStatus(path); + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); } @@ -222,9 +226,9 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return seaweedFileSystemStore.createDirectory(path, currentUser, @@ -233,6 +237,8 @@ public class SeaweedFileSystem extends FileSystem { } + FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { return true; } else { @@ -241,7 +247,7 @@ public class SeaweedFileSystem extends FileSystem { } @Override - public FileStatus getFileStatus(Path path) { + public FileStatus getFileStatus(Path path) throws IOException { LOG.debug("getFileStatus path: {}", path); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 9617a38be..d9c6d6f0d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -61,7 +61,7 @@ public class SeaweedFileSystemStore { ); } - public FileStatus[] listEntries(final Path path) { + public FileStatus[] listEntries(final Path path) throws IOException { LOG.debug("listEntries path: {}", path); FileStatus pathStatus = getFileStatus(path); @@ -89,11 +89,11 @@ public class SeaweedFileSystemStore { } - public FileStatus getFileStatus(final Path path) { + public FileStatus getFileStatus(final Path path) throws IOException { FilerProto.Entry entry = lookupEntry(path); if (entry == null) { - return null; + throw new FileNotFoundException("File does not exist: " + path); } LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); @@ -136,7 +136,7 @@ public class SeaweedFileSystemStore { modification_time, access_time, permission, owner, group, null, path); } - private FilerProto.Entry lookupEntry(Path path) { + public FilerProto.Entry lookupEntry(Path path) { return filerClient.lookupEntry(getParentDirectory(path), path.getName()); @@ -212,7 +212,6 @@ public class SeaweedFileSystemStore { LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); - int readAheadQueueDepth = 2; FilerProto.Entry entry = lookupEntry(path); if (entry == null) { @@ -223,8 +222,7 @@ public class SeaweedFileSystemStore { statistics, path.toUri().getPath(), entry, - bufferSize, - readAheadQueueDepth); + bufferSize); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 90c14c772..c26ad728f 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -27,33 +27,23 @@ public class SeaweedInputStream extends FSInputStream { private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final long contentLength; private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - private byte[] buffer = null; // will be initialized on first use + private long position = 0; // cursor of the file - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { + final FilerGrpcClient filerGrpcClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry, + final int bufferSize) { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); @@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream { if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } - if (offset >= b.length) { + if (off >= b.length) { throw new IllegalArgumentException("offset greater than length of array"); } - if (length < 0) { + if (len < 0) { throw new IllegalArgumentException("requested read length is less than zero"); } - if (length > (b.length - offset)) { + if (len > (b.length - off)) { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } - return (int) bytesRead; + + if (bytesRead > 0) { + this.position += bytesRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + + return (int)bytesRead; + } /** @@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } - - // next read will read from here - fCursor = n; + this.position = n; - //invalidate buffer - limit = 0; - bCursor = 0; } @Override @@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - long currentPos = getPos(); - if (currentPos == contentLength) { + if (this.position == contentLength) { if (n > 0) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } } - long newPos = currentPos + n; + long newPos = this.position + n; if (newPos < 0) { newPos = 0; - n = newPos - currentPos; + n = newPos - this.position; } if (newPos > contentLength) { newPos = contentLength; - n = newPos - currentPos; + n = newPos - this.position; } seek(newPos); return n; @@ -289,11 +163,11 @@ public class SeaweedInputStream extends FSInputStream { public synchronized int available() throws IOException { if (closed) { throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); + FSExceptionMessages.STREAM_IS_CLOSED); } final long remaining = this.contentLength - this.getPos(); return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining : Integer.MAX_VALUE; } /** @@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return position; } /** @@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized void close() throws IOException { closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner } /** diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 96af27fe0..9ea26776b 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.ByteBufferPool; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedWrite; @@ -16,14 +17,10 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Locale; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -37,16 +34,16 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; + private final FilerProto.Entry.Builder entry; + private final boolean supportFlush = false; // true; + private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private long position; private boolean closed; - private boolean supportFlush = true; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; + private ByteBuffer buffer; + private long outputIndex; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -59,18 +56,18 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; @@ -93,7 +90,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea @Override public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { + throws IOException { maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -102,25 +99,29 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea throw new IndexOutOfBoundsException(); } + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; + int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; + + if (numberOfBytesToWrite < writableBytes) { + buffer.put(data, currentOffset, numberOfBytesToWrite); + outputIndex += numberOfBytesToWrite; + break; } - writableBytes = bufferSize - bufferIndex; + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + buffer.put(data, currentOffset, writableBytes); + outputIndex += writableBytes; + currentOffset += writableBytes; + writeCurrentBufferToService(); + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + writableBytes = bufferSize - buffer.position(); } + } /** @@ -199,8 +200,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea threadExecutor.shutdown(); } finally { lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + ByteBufferPool.release(buffer); buffer = null; - bufferIndex = 0; + outputIndex = 0; closed = true; writeOperations.clear(); if (!threadExecutor.isShutdown()) { @@ -210,35 +212,17 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea } private synchronized void writeCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { + if (buffer.position() == 0) { return; } - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; - - buffer = new byte[bufferSize]; - bufferIndex = 0; - final long offset = position; + buffer.flip(); + int bytesLength = buffer.limit() - buffer.position(); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit()); + // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")"); position += bytesLength; + buffer.clear(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { - waitForTaskToComplete(); - } - - final Future<Void> job = completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } - }); - - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); } private void waitForTaskToComplete() throws IOException { |
