aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
authorustuzhanin <55892859+ustuzhanin@users.noreply.github.com>2020-10-02 22:47:25 +0500
committerGitHub <noreply@github.com>2020-10-02 22:47:25 +0500
commit3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch)
treee0b42e531d18136d9e272258187a305690ee2b4d /other/java
parentcbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff)
parent9ab98fa912814686b3035a97b5173c1628fbc0fc (diff)
downloadseaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz
seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml2
-rw-r--r--other/java/client/pom.xml.deploy2
-rw-r--r--other/java/client/pom_debug.xml2
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java55
-rw-r--r--other/java/client/src/main/proto/filer.proto32
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml176
-rw-r--r--other/java/hdfs2/pom.xml4
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java29
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java11
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java4
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml184
-rw-r--r--other/java/hdfs3/pom.xml4
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java29
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java11
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java4
15 files changed, 486 insertions, 63 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 4d8f93bff..ad9145155 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.4.4</version>
+ <version>1.4.8</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
index 4d8f93bff..ad9145155 100644
--- a/other/java/client/pom.xml.deploy
+++ b/other/java/client/pom.xml.deploy
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.4.4</version>
+ <version>1.4.8</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index bb2ba5e74..55aa56e8b 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.4.4</version>
+ <version>1.4.8</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index cd2f55678..045751717 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -23,7 +23,7 @@ public class SeaweedRead {
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength) throws IOException {
+ final int bufferLength, final long fileSize) throws IOException {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
@@ -42,6 +42,14 @@ public class SeaweedRead {
long readCount = 0;
int startOffset = bufferOffset;
for (ChunkView chunkView : chunkViews) {
+
+ if (startOffset < chunkView.logicOffset) {
+ long gap = chunkView.logicOffset - startOffset;
+ LOG.debug("zero [{},{})", startOffset, startOffset + gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
if (locations == null || locations.getLocationsCount() == 0) {
LOG.error("failed to locate {}", chunkView.fileId);
@@ -51,11 +59,22 @@ public class SeaweedRead {
int len = readChunkView(position, buffer, startOffset, chunkView, locations);
+ LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
+
readCount += len;
startOffset += len;
}
+ long limit = Math.min(bufferLength, fileSize);
+
+ if (startOffset < limit) {
+ long gap = limit - startOffset;
+ LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
+ readCount += gap;
+ startOffset += gap;
+ }
+
return readCount;
}
@@ -71,7 +90,7 @@ public class SeaweedRead {
int len = (int) chunkView.size;
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
- System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
+ System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len);
return len;
}
@@ -93,7 +112,7 @@ public class SeaweedRead {
Header contentEncodingHeader = entity.getContentEncoding();
if (contentEncodingHeader != null) {
- HeaderElement[] encodings =contentEncodingHeader.getElements();
+ HeaderElement[] encodings = contentEncodingHeader.getElements();
for (int i = 0; i < encodings.length; i++) {
if (encodings[i].getName().equalsIgnoreCase("gzip")) {
entity = new GzipDecompressingEntity(entity);
@@ -134,18 +153,19 @@ public class SeaweedRead {
long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) {
- if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
+ long chunkStart = Math.max(offset, chunk.start);
+ long chunkStop = Math.min(stop, chunk.stop);
+ if (chunkStart < chunkStop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView(
chunk.fileId,
- offset - chunk.start,
- Math.min(chunk.stop, stop) - offset,
- offset,
+ chunkStart - chunk.start + chunk.chunkOffset,
+ chunkStop - chunkStart,
+ chunkStart,
isFullChunk,
chunk.cipherKey,
chunk.isCompressed
));
- offset = Math.min(chunk.stop, stop);
}
}
return views;
@@ -160,7 +180,13 @@ public class SeaweedRead {
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override
public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
- return (int) (a.getMtime() - b.getMtime());
+ // if just a.getMtime() - b.getMtime(), it will overflow!
+ if (a.getMtime() < b.getMtime()) {
+ return -1;
+ } else if (a.getMtime() > b.getMtime()) {
+ return 1;
+ }
+ return 0;
}
});
@@ -181,6 +207,7 @@ public class SeaweedRead {
chunk.getOffset() + chunk.getSize(),
chunk.getFileId(),
chunk.getMtime(),
+ 0,
true,
chunk.getCipherKey().toByteArray(),
chunk.getIsCompressed()
@@ -203,6 +230,7 @@ public class SeaweedRead {
chunk.getOffset(),
v.fileId,
v.modifiedTime,
+ v.chunkOffset,
false,
v.cipherKey,
v.isCompressed
@@ -215,6 +243,7 @@ public class SeaweedRead {
v.stop,
v.fileId,
v.modifiedTime,
+ v.chunkOffset + (chunkStop - v.start),
false,
v.cipherKey,
v.isCompressed
@@ -247,6 +276,10 @@ public class SeaweedRead {
return fileId;
}
+ public static long fileSize(FilerProto.Entry entry) {
+ return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ }
+
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
long size = 0;
for (FilerProto.FileChunk chunk : chunksList) {
@@ -263,15 +296,17 @@ public class SeaweedRead {
public final long stop;
public final long modifiedTime;
public final String fileId;
+ public final long chunkOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
+ this.chunkOffset = chunkOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
this.isCompressed = isCompressed;
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index dcc18f2a5..daa20c378 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -58,6 +58,12 @@ service SeaweedFiler {
rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
}
+ rpc KvGet (KvGetRequest) returns (KvGetResponse) {
+ }
+
+ rpc KvPut (KvPutRequest) returns (KvPutResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -89,6 +95,8 @@ message Entry {
repeated FileChunk chunks = 3;
FuseAttributes attributes = 4;
map<string, bytes> extended = 5;
+ bytes hard_link_id = 7;
+ int32 hard_link_counter = 8; // only exists in hard link meta data
}
message FullEntry {
@@ -102,6 +110,7 @@ message EventNotification {
bool delete_chunks = 3;
string new_parent_path = 4;
bool is_from_other_cluster = 5;
+ repeated int32 signatures = 6;
}
message FileChunk {
@@ -150,6 +159,7 @@ message CreateEntryRequest {
Entry entry = 2;
bool o_excl = 3;
bool is_from_other_cluster = 4;
+ repeated int32 signatures = 5;
}
message CreateEntryResponse {
@@ -160,6 +170,7 @@ message UpdateEntryRequest {
string directory = 1;
Entry entry = 2;
bool is_from_other_cluster = 3;
+ repeated int32 signatures = 4;
}
message UpdateEntryResponse {
}
@@ -180,6 +191,7 @@ message DeleteEntryRequest {
bool is_recursive = 5;
bool ignore_recursive_error = 6;
bool is_from_other_cluster = 7;
+ repeated int32 signatures = 8;
}
message DeleteEntryResponse {
@@ -262,12 +274,16 @@ message GetFilerConfigurationResponse {
uint32 max_mb = 4;
string dir_buckets = 5;
bool cipher = 7;
+ int32 signature = 8;
+ string metrics_address = 9;
+ int32 metrics_interval_sec = 10;
}
message SubscribeMetadataRequest {
string client_name = 1;
string path_prefix = 2;
int64 since_ns = 3;
+ int32 signature = 4;
}
message SubscribeMetadataResponse {
string directory = 1;
@@ -302,3 +318,19 @@ message LocateBrokerResponse {
}
repeated Resource resources = 2;
}
+
+// Key-Value operations
+message KvGetRequest {
+ bytes key = 1;
+}
+message KvGetResponse {
+ bytes value = 1;
+ string error = 2;
+}
+message KvPutRequest {
+ bytes key = 1;
+ bytes value = 2;
+}
+message KvPutResponse {
+ string error = 1;
+}
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index d00291c98..e3b434b9b 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -120,6 +120,180 @@
</plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>xmlenc</artifactId>
+ <groupId>xmlenc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-sslengine</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jets3t</artifactId>
+ <groupId>net.java.dev.jets3t</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration</artifactId>
+ <groupId>commons-configuration</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-core-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +301,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.4.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.4.8</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index 6d9191727..595d070e8 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.4.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.4.8</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
@@ -147,6 +147,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
@@ -157,6 +158,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fd8877806..6551548fa 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -5,7 +5,6 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -14,20 +13,19 @@ import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-
public class SeaweedFileSystem extends FileSystem {
- public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
+ public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
+ public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
+ public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
@@ -75,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
- FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize);
+ return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
@@ -93,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem {
try {
String replicaPlacement = String.format("%03d", replication - 1);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@@ -103,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem {
/**
* {@inheritDoc}
+ *
* @throws FileNotFoundException if the parent directory is not present -or
- * is not a directory.
+ * is not a directory.
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
@@ -121,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize,
- replication, blockSize, progress);
+ replication, seaweedBufferSize, progress);
}
@Override
@@ -133,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
@@ -338,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
+ final boolean createParent) throws
IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 0db6a1f49..3dc38fe1e 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -8,14 +8,10 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
+import seaweedfs.client.*;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
@@ -124,7 +120,7 @@ public class SeaweedFileSystemStore {
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
- long length = SeaweedRead.totalSize(entry.getChunksList());
+ long length = SeaweedRead.fileSize(entry);
boolean isDir = entry.getIsDirectory();
int block_replication = 1;
int blocksize = 512;
@@ -185,7 +181,7 @@ public class SeaweedFileSystemStore {
entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
- writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
+ writePosition = SeaweedRead.fileSize(existingEntry);
replication = existingEntry.getAttributes().getReplication();
}
}
@@ -202,6 +198,7 @@ public class SeaweedFileSystemStore {
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
+ SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
}
return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 6b3c72f7d..36c0766a4 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -41,7 +41,7 @@ public class SeaweedInputStream extends FSInputStream {
this.statistics = statistics;
this.path = path;
this.entry = entry;
- this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
+ this.contentLength = SeaweedRead.fileSize(entry);
this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
@@ -87,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
- long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
+ long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index 0dcc49b3f..46add89fe 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -120,6 +120,188 @@
</plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>javax.servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-server</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-servlet</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-webapp</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-servlet</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-beanutils</artifactId>
+ <groupId>commons-beanutils</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration2</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>re2j</artifactId>
+ <groupId>com.google.re2j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>kerb-simplekdc</artifactId>
+ <groupId>org.apache.kerby</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-databind</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +309,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.4.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.4.8</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index 05a613759..240c6eb41 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.4.4</seaweedfs.client.version>
+ <seaweedfs.client.version>1.4.8</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
@@ -147,6 +147,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
@@ -157,6 +158,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fd8877806..6551548fa 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -5,7 +5,6 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -14,20 +13,19 @@ import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-
public class SeaweedFileSystem extends FileSystem {
- public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
+ public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
+ public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
+ public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
@@ -75,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
- FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024));
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize);
+ return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
@@ -93,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem {
try {
String replicaPlacement = String.format("%03d", replication - 1);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@@ -103,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem {
/**
* {@inheritDoc}
+ *
* @throws FileNotFoundException if the parent directory is not present -or
- * is not a directory.
+ * is not a directory.
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
@@ -121,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
return create(path, permission,
flags.contains(CreateFlag.OVERWRITE), bufferSize,
- replication, blockSize, progress);
+ replication, seaweedBufferSize, progress);
}
@Override
@@ -133,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
try {
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
+ int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
@@ -338,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
+ final boolean createParent) throws
IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 0db6a1f49..3dc38fe1e 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -8,14 +8,10 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
+import seaweedfs.client.*;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
@@ -124,7 +120,7 @@ public class SeaweedFileSystemStore {
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
- long length = SeaweedRead.totalSize(entry.getChunksList());
+ long length = SeaweedRead.fileSize(entry);
boolean isDir = entry.getIsDirectory();
int block_replication = 1;
int blocksize = 512;
@@ -185,7 +181,7 @@ public class SeaweedFileSystemStore {
entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
- writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
+ writePosition = SeaweedRead.fileSize(existingEntry);
replication = existingEntry.getAttributes().getReplication();
}
}
@@ -202,6 +198,7 @@ public class SeaweedFileSystemStore {
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
+ SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
}
return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 6b3c72f7d..36c0766a4 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -41,7 +41,7 @@ public class SeaweedInputStream extends FSInputStream {
this.statistics = statistics;
this.path = path;
this.entry = entry;
- this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
+ this.contentLength = SeaweedRead.fileSize(entry);
this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
@@ -87,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
- long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
+ long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}