From 703057bff9d8a1ef02af43722e0fd5bd6c536a7e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jul 2020 20:15:43 -0700 Subject: mirror changes from hdfs2 --- .../seaweed/hdfs/BufferedSeaweedInputStream.java | 49 ++++++++++++++++++++++ .../main/java/seaweed/hdfs/SeaweedFileSystem.java | 2 +- 2 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java (limited to 'other/java/hdfs3/src') diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java new file mode 100644 index 000000000..0bee6e43f --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java @@ -0,0 +1,49 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +import java.io.BufferedInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable { + + SeaweedInputStream t; + + protected BufferedSeaweedInputStream(InputStream in, int bufferSize) { + super(new BufferedInputStream(in, bufferSize)); + t = (SeaweedInputStream)in; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + return this.t.read(position,buffer,offset,length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + this.t.readFully(position,buffer,offset,length); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + this.t.readFully(position,buffer); + } + + @Override + public void seek(long pos) throws IOException { + this.t.seek(pos); + } + + @Override + public long getPos() throws IOException { + return this.t.getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return this.t.seekToNewSource(targetPos); + } +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 2341d335d..585fa39a3 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -76,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; -- cgit v1.2.3 From 6b41c5250bd8119c8fb5485ca9a2f2e42798a91e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jul 2020 21:33:10 -0700 Subject: Hadoop file system: 1.4.3 added buffered fs input stream --- .../seaweed/hdfs/BufferedSeaweedInputStream.java | 49 ---------------------- .../main/java/seaweed/hdfs/SeaweedFileSystem.java | 4 +- .../java/seaweed/hdfs/SeaweedFileSystemStore.java | 5 ++- 3 files changed, 5 insertions(+), 53 deletions(-) delete mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java (limited to 'other/java/hdfs3/src') diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java deleted file mode 100644 index 0bee6e43f..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java +++ /dev/null @@ -1,49 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; - -import java.io.BufferedInputStream; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable { - - SeaweedInputStream t; - - protected BufferedSeaweedInputStream(InputStream in, int bufferSize) { - super(new BufferedInputStream(in, bufferSize)); - t = (SeaweedInputStream)in; - } - - @Override - public int read(long position, byte[] buffer, int offset, int length) throws IOException { - return this.t.read(position,buffer,offset,length); - } - - @Override - public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - this.t.readFully(position,buffer,offset,length); - } - - @Override - public void readFully(long position, byte[] buffer) throws IOException { - this.t.readFully(position,buffer); - } - - @Override - public void seek(long pos) throws IOException { - this.t.seek(pos); - } - - @Override - public long getPos() throws IOException { - return this.t.getPos(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return this.t.seekToNewSource(targetPos); - } -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 585fa39a3..fd8877806 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -75,8 +75,8 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024)); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index d9c6d6f0d..0db6a1f49 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -1,5 +1,6 @@ package seaweed.hdfs; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -207,8 +208,8 @@ public class SeaweedFileSystemStore { } - public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, + int bufferSize) throws IOException { LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); -- cgit v1.2.3 From b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:06:09 -0700 Subject: add read ahead input stream --- other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'other/java/hdfs3/src') diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index fd8877806..836bb4db5 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,6 +11,7 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; +import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -76,7 +77,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; -- cgit v1.2.3 From 15dc0a704db7aa542471b56f10ceb749dc041b12 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:11:24 -0700 Subject: Revert "add read ahead input stream" This reverts commit b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6. --- other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'other/java/hdfs3/src') diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 836bb4db5..fd8877806 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,7 +11,6 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; -import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -77,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; -- cgit v1.2.3 From 13bfe5deefcb83a5a89f9f15e7a15c29f97b1730 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 16 Aug 2020 21:14:39 -0700 Subject: same logic for reading random access files from Go --- .../java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java | 4 ++-- other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'other/java/hdfs3/src') diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 0db6a1f49..53185367a 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -124,7 +124,7 @@ public class SeaweedFileSystemStore { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.totalSize(entry.getChunksList()); + long length = SeaweedRead.fileSize(entry); boolean isDir = entry.getIsDirectory(); int block_replication = 1; int blocksize = 512; @@ -185,7 +185,7 @@ public class SeaweedFileSystemStore { entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + writePosition = SeaweedRead.fileSize(existingEntry); replication = existingEntry.getAttributes().getReplication(); } } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 6b3c72f7d..36c0766a4 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -41,7 +41,7 @@ public class SeaweedInputStream extends FSInputStream { this.statistics = statistics; this.path = path; this.entry = entry; - this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); + this.contentLength = SeaweedRead.fileSize(entry); this.bufferSize = bufferSize; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); @@ -87,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } -- cgit v1.2.3 From 5eee4983f36f55a2a01381e8af278b28919dbe90 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 16 Sep 2020 17:18:18 -0700 Subject: 1.4.7 hdfs configurable fs.seaweed.buffer.size --- .../main/java/seaweed/hdfs/SeaweedFileSystem.java | 29 +++++++++++----------- 1 file changed, 15 insertions(+), 14 deletions(-) (limited to 'other/java/hdfs3/src') diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index fd8877806..6551548fa 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -5,7 +5,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -14,20 +13,19 @@ import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.Map; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; - public class SeaweedFileSystem extends FileSystem { - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final int FS_SEAWEED_DEFAULT_PORT = 8888; + public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; + public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); @@ -75,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; @@ -93,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem { try { String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); @@ -103,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem { /** * {@inheritDoc} + * * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. + * is not a directory. */ @Override public FSDataOutputStream createNonRecursive(Path path, @@ -121,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem { throw new FileAlreadyExistsException("Not a directory: " + parent); } } + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); return create(path, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); + replication, seaweedBufferSize, progress); } @Override @@ -133,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); @@ -338,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem { @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, + final boolean createParent) throws IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( -- cgit v1.2.3