diff options
| author | hasagi <30975629+LIBA-S@users.noreply.github.com> | 2020-09-22 21:38:38 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-09-22 21:38:38 +0800 |
| commit | d7bf2390e2bf4ac55132878faa68119b3558e8e4 (patch) | |
| tree | 48ede45893c2130d3e039f7fe4af8440835eb02d /other/java | |
| parent | 37e964d4bd60a9dd792a9cc24f05eaa05d3766f2 (diff) | |
| parent | ec5b9f1e91a8609d0e70bf9d26dc0840774153c4 (diff) | |
| download | seaweedfs-d7bf2390e2bf4ac55132878faa68119b3558e8e4.tar.xz seaweedfs-d7bf2390e2bf4ac55132878faa68119b3558e8e4.zip | |
Merge pull request #1 from chrislusf/master
catch up
Diffstat (limited to 'other/java')
20 files changed, 536 insertions, 81 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index b2e88935c..edfd38a43 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.4.1</version> + <version>1.4.7</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index b2e88935c..edfd38a43 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.4.1</version> + <version>1.4.7</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index d2d4cbb5d..6b553c831 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.4.1</version> + <version>1.4.7</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index 58870d742..7afa2dca0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -15,6 +15,7 @@ public class ChunkCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) + .weakValues() .expireAfterAccess(1, TimeUnit.HOURS) .build(); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index d8d29ede8..79e8d9bc4 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -6,7 +6,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; public class FileChunkManifest { @@ -51,13 +50,17 @@ public class FileChunkManifest { private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { - FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); String vid = "" + chunk.getFid().getVolumeId(); - lookupRequest.addVolumeIds(vid); - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); - Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); - FilerProto.Locations locations = vid2Locations.get(vid); + FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid); + if (locations == null) { + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); + lookupRequest.addVolumeIds(vid); + FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + locations = lookupResponse.getLocationsMapMap().get(vid); + filerGrpcClient.vidLocations.put(vid, locations); + LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); + } SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() @@ -73,8 +76,10 @@ public class FileChunkManifest { LOG.debug("doFetchFullChunkData:{}", chunkView); chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); } - LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); - SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + if (chunk.getIsChunkManifest()){ + LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); + SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + } return chunkData; diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 57b67f6b0..1a719f3c0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -9,6 +9,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { @@ -24,6 +26,7 @@ public class FilerGrpcClient { } } + public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java index 248285dd3..4909094f5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/Gzip.java +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -18,14 +18,18 @@ public class Gzip { return compressed; } - public static byte[] decompress(byte[] compressed) throws IOException { - ByteArrayInputStream bis = new ByteArrayInputStream(compressed); - GZIPInputStream gis = new GZIPInputStream(bis); - return readAll(gis); + public static byte[] decompress(byte[] compressed) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } catch (Exception e) { + return compressed; + } } private static byte[] readAll(InputStream input) throws IOException { - try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { byte[] buffer = new byte[4096]; int n; while (-1 != (n = input.read(buffer))) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index f0490540d..045751717 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -1,7 +1,10 @@ package seaweedfs.client; +import org.apache.http.Header; +import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; +import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.util.EntityUtils; @@ -15,12 +18,12 @@ public class SeaweedRead { private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); - static ChunkCache chunkCache = new ChunkCache(0); + static ChunkCache chunkCache = new ChunkCache(4); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) throws IOException { + final int bufferLength, final long fileSize) throws IOException { List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); @@ -39,6 +42,14 @@ public class SeaweedRead { long readCount = 0; int startOffset = bufferOffset; for (ChunkView chunkView : chunkViews) { + + if (startOffset < chunkView.logicOffset) { + long gap = chunkView.logicOffset - startOffset; + LOG.debug("zero [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); if (locations == null || locations.getLocationsCount() == 0) { LOG.error("failed to locate {}", chunkView.fileId); @@ -48,11 +59,22 @@ public class SeaweedRead { int len = readChunkView(position, buffer, startOffset, chunkView, locations); + LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); + readCount += len; startOffset += len; } + long limit = Math.min(bufferLength, fileSize); + + if (startOffset < limit) { + long gap = limit - startOffset; + LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + return readCount; } @@ -62,14 +84,13 @@ public class SeaweedRead { if (chunkData == null) { chunkData = doFetchFullChunkData(chunkView, locations); + chunkCache.setChunk(chunkView.fileId, chunkData); } int len = (int) chunkView.size; LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); - System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); - - chunkCache.setChunk(chunkView.fileId, chunkData); + System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len); return len; } @@ -79,7 +100,7 @@ public class SeaweedRead { HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); byte[] data = null; @@ -88,6 +109,18 @@ public class SeaweedRead { try { HttpEntity entity = response.getEntity(); + Header contentEncodingHeader = entity.getContentEncoding(); + + if (contentEncodingHeader != null) { + HeaderElement[] encodings = contentEncodingHeader.getElements(); + for (int i = 0; i < encodings.length; i++) { + if (encodings[i].getName().equalsIgnoreCase("gzip")) { + entity = new GzipDecompressingEntity(entity); + break; + } + } + } + data = EntityUtils.toByteArray(entity); EntityUtils.consume(entity); @@ -97,10 +130,6 @@ public class SeaweedRead { request.releaseConnection(); } - if (chunkView.isCompressed) { - data = Gzip.decompress(data); - } - if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { try { data = SeaweedCipher.decrypt(data, chunkView.cipherKey); @@ -109,6 +138,12 @@ public class SeaweedRead { } } + if (chunkView.isCompressed) { + data = Gzip.decompress(data); + } + + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + return data; } @@ -118,18 +153,19 @@ public class SeaweedRead { long stop = offset + size; for (VisibleInterval chunk : visibleIntervals) { - if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + long chunkStart = Math.max(offset, chunk.start); + long chunkStop = Math.min(stop, chunk.stop); + if (chunkStart < chunkStop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, + chunkStart - chunk.start + chunk.chunkOffset, + chunkStop - chunkStart, + chunkStart, isFullChunk, chunk.cipherKey, chunk.isCompressed )); - offset = Math.min(chunk.stop, stop); } } return views; @@ -144,7 +180,13 @@ public class SeaweedRead { Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { @Override public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - return (int) (a.getMtime() - b.getMtime()); + // if just a.getMtime() - b.getMtime(), it will overflow! + if (a.getMtime() < b.getMtime()) { + return -1; + } else if (a.getMtime() > b.getMtime()) { + return 1; + } + return 0; } }); @@ -165,6 +207,7 @@ public class SeaweedRead { chunk.getOffset() + chunk.getSize(), chunk.getFileId(), chunk.getMtime(), + 0, true, chunk.getCipherKey().toByteArray(), chunk.getIsCompressed() @@ -187,6 +230,7 @@ public class SeaweedRead { chunk.getOffset(), v.fileId, v.modifiedTime, + v.chunkOffset, false, v.cipherKey, v.isCompressed @@ -199,6 +243,7 @@ public class SeaweedRead { v.stop, v.fileId, v.modifiedTime, + v.chunkOffset + (chunkStop - v.start), false, v.cipherKey, v.isCompressed @@ -231,6 +276,10 @@ public class SeaweedRead { return fileId; } + public static long fileSize(FilerProto.Entry entry) { + return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize()); + } + public static long totalSize(List<FilerProto.FileChunk> chunksList) { long size = 0; for (FilerProto.FileChunk chunk : chunksList) { @@ -247,15 +296,17 @@ public class SeaweedRead { public final long stop; public final long modifiedTime; public final String fileId; + public final long chunkOffset; public final boolean isFullChunk; public final byte[] cipherKey; public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; + this.chunkOffset = chunkOffset; this.isFullChunk = isFullChunk; this.cipherKey = cipherKey; this.isCompressed = isCompressed; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 5f4d888bd..d3cecab75 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -61,9 +61,6 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - // cache fileId ~ bytes - SeaweedRead.chunkCache.setChunk(fileId, bytes); - LOG.debug("write file chunk {} size {}", targetUrl, bytesLength); return FilerProto.FileChunk.newBuilder() diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index dcc18f2a5..9a72bc976 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -58,6 +58,12 @@ service SeaweedFiler { rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { } + rpc KvGet (KvGetRequest) returns (KvGetResponse) { + } + + rpc KvPut (KvPutRequest) returns (KvPutResponse) { + } + } ////////////////////////////////////////////////// @@ -102,6 +108,7 @@ message EventNotification { bool delete_chunks = 3; string new_parent_path = 4; bool is_from_other_cluster = 5; + repeated int32 signatures = 6; } message FileChunk { @@ -150,6 +157,7 @@ message CreateEntryRequest { Entry entry = 2; bool o_excl = 3; bool is_from_other_cluster = 4; + repeated int32 signatures = 5; } message CreateEntryResponse { @@ -160,6 +168,7 @@ message UpdateEntryRequest { string directory = 1; Entry entry = 2; bool is_from_other_cluster = 3; + repeated int32 signatures = 4; } message UpdateEntryResponse { } @@ -180,6 +189,7 @@ message DeleteEntryRequest { bool is_recursive = 5; bool ignore_recursive_error = 6; bool is_from_other_cluster = 7; + repeated int32 signatures = 8; } message DeleteEntryResponse { @@ -262,12 +272,16 @@ message GetFilerConfigurationResponse { uint32 max_mb = 4; string dir_buckets = 5; bool cipher = 7; + int32 signature = 8; + string metrics_address = 9; + int32 metrics_interval_sec = 10; } message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; + int32 signature = 4; } message SubscribeMetadataResponse { string directory = 1; @@ -302,3 +316,19 @@ message LocateBrokerResponse { } repeated Resource resources = 2; } + +// Key-Value operations +message KvGetRequest { + bytes key = 1; +} +message KvGetResponse { + bytes value = 1; + string error = 2; +} +message KvPutRequest { + bytes key = 1; + bytes value = 2; +} +message KvPutResponse { + string error = 1; +} diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 9d0c501ae..0ba8bbae1 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -120,6 +120,180 @@ </plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>xmlenc</artifactId>
+ <groupId>xmlenc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-sslengine</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jets3t</artifactId>
+ <groupId>net.java.dev.jets3t</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration</artifactId>
+ <groupId>commons-configuration</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-core-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +301,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.4.1</seaweedfs.client.version>
+ <seaweedfs.client.version>1.4.7</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 25a968c07..0e6d14332 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.4.1</seaweedfs.client.version> + <seaweedfs.client.version>1.4.7</seaweedfs.client.version> <hadoop.version>2.9.2</hadoop.version> </properties> @@ -147,6 +147,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>com.github.chrislusf</groupId> @@ -157,6 +158,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> </dependencies> diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 2341d335d..6551548fa 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -5,7 +5,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -14,20 +13,19 @@ import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.Map; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; - public class SeaweedFileSystem extends FileSystem { - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final int FS_SEAWEED_DEFAULT_PORT = 8888; + public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; + public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); @@ -75,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; @@ -93,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem { try { String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); @@ -103,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem { /** * {@inheritDoc} + * * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. + * is not a directory. */ @Override public FSDataOutputStream createNonRecursive(Path path, @@ -121,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem { throw new FileAlreadyExistsException("Not a directory: " + parent); } } + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); return create(path, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); + replication, seaweedBufferSize, progress); } @Override @@ -133,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); @@ -338,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem { @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, + final boolean createParent) throws IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index d9c6d6f0d..53185367a 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -1,5 +1,6 @@ package seaweed.hdfs; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -123,7 +124,7 @@ public class SeaweedFileSystemStore { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.totalSize(entry.getChunksList()); + long length = SeaweedRead.fileSize(entry); boolean isDir = entry.getIsDirectory(); int block_replication = 1; int blocksize = 512; @@ -184,7 +185,7 @@ public class SeaweedFileSystemStore { entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + writePosition = SeaweedRead.fileSize(existingEntry); replication = existingEntry.getAttributes().getReplication(); } } @@ -207,8 +208,8 @@ public class SeaweedFileSystemStore { } - public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, + int bufferSize) throws IOException { LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 6b3c72f7d..36c0766a4 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -41,7 +41,7 @@ public class SeaweedInputStream extends FSInputStream { this.statistics = statistics; this.path = path; this.entry = entry; - this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); + this.contentLength = SeaweedRead.fileSize(entry); this.bufferSize = bufferSize; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); @@ -87,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 0b01074c3..691be547e 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -120,6 +120,188 @@ </plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>javax.servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-server</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-servlet</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-webapp</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-servlet</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-beanutils</artifactId>
+ <groupId>commons-beanutils</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration2</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>re2j</artifactId>
+ <groupId>com.google.re2j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>kerb-simplekdc</artifactId>
+ <groupId>org.apache.kerby</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-databind</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +309,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.4.1</seaweedfs.client.version>
+ <seaweedfs.client.version>1.4.7</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 025843d3c..8e8ec1958 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.4.1</seaweedfs.client.version> + <seaweedfs.client.version>1.4.7</seaweedfs.client.version> <hadoop.version>3.1.1</hadoop.version> </properties> @@ -147,6 +147,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>com.github.chrislusf</groupId> @@ -157,6 +158,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> </dependencies> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 2341d335d..6551548fa 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -5,7 +5,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -14,20 +13,19 @@ import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.Map; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; - public class SeaweedFileSystem extends FileSystem { - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final int FS_SEAWEED_DEFAULT_PORT = 8888; + public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; + public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); @@ -75,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; @@ -93,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem { try { String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); @@ -103,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem { /** * {@inheritDoc} + * * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. + * is not a directory. */ @Override public FSDataOutputStream createNonRecursive(Path path, @@ -121,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem { throw new FileAlreadyExistsException("Not a directory: " + parent); } } + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); return create(path, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); + replication, seaweedBufferSize, progress); } @Override @@ -133,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); @@ -338,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem { @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, + final boolean createParent) throws IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index d9c6d6f0d..53185367a 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -1,5 +1,6 @@ package seaweed.hdfs; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -123,7 +124,7 @@ public class SeaweedFileSystemStore { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.totalSize(entry.getChunksList()); + long length = SeaweedRead.fileSize(entry); boolean isDir = entry.getIsDirectory(); int block_replication = 1; int blocksize = 512; @@ -184,7 +185,7 @@ public class SeaweedFileSystemStore { entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + writePosition = SeaweedRead.fileSize(existingEntry); replication = existingEntry.getAttributes().getReplication(); } } @@ -207,8 +208,8 @@ public class SeaweedFileSystemStore { } - public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, + int bufferSize) throws IOException { LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 6b3c72f7d..36c0766a4 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -41,7 +41,7 @@ public class SeaweedInputStream extends FSInputStream { this.statistics = statistics; this.path = path; this.entry = entry; - this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); + this.contentLength = SeaweedRead.fileSize(entry); this.bufferSize = bufferSize; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); @@ -87,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } |
