diff options
Diffstat (limited to 'other/java/hdfs3')
7 files changed, 132 insertions, 94 deletions
diff --git a/other/java/hdfs3/README.md b/other/java/hdfs3/README.md index f1afee264..e08f02a7c 100644 --- a/other/java/hdfs3/README.md +++ b/other/java/hdfs3/README.md @@ -130,6 +130,15 @@ The test suite covers: <name>fs.seaweed.filer.port.grpc</name> <value>18888</value> </property> + <!-- Optional: Replication configuration with three priority levels: + 1) If set to non-empty value (e.g. "001") - uses that value + 2) If set to empty string "" - uses SeaweedFS filer's default replication + 3) If not configured (property not present) - uses HDFS replication parameter + --> + <!-- <property> + <name>fs.seaweed.replication</name> + <value>001</value> + </property> --> </configuration> ``` diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index d3c2751a5..c6579c3fb 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -572,7 +572,7 @@ </dependency>
</dependencies>
<properties>
- <seaweedfs.client.version>3.80</seaweedfs.client.version>
+ <seaweedfs.client.version>3.80.1-SNAPSHOT</seaweedfs.client.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 061d4d700..824db8264 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>3.80</seaweedfs.client.version> + <seaweedfs.client.version>3.80.1-SNAPSHOT</seaweedfs.client.version> <hadoop.version>3.4.0</hadoop.version> </properties> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 58fcaf975..513266d69 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -59,7 +59,7 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000); + int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port + 10000); setConf(conf); this.uri = uri; @@ -85,29 +85,45 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); - return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize)); + + // Use BufferedFSInputStream for all streams (like RawLocalFileSystem) + // This ensures proper position tracking for positioned reads (critical for + // Parquet) + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { - LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); - return null; + LOG.error("Failed to open file: {} bufferSize:{}", path, bufferSize, ex); + throw new IOException("Failed to open file: " + path, ex); } } @Override public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, - final short replication, final long blockSize, final Progressable progress) throws IOException { + final short replication, final long blockSize, final Progressable progress) throws IOException { LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize); path = qualify(path); + final Path finalPath = path; // For use in anonymous inner class try { - String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1)); + // Priority: 1) non-empty FS_SEAWEED_REPLICATION, 2) empty string -> filer + // default, 3) null -> HDFS replication + String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION); + if (replicaPlacement == null) { + // Not configured, use HDFS replication parameter. This creates a "00N" + // replication string, + // placing N (replication-1) extra replicas on different servers in the same + // rack. + replicaPlacement = String.format("%03d", replication - 1); + } int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, + overwrite, permission, + seaweedBufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { - LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); - return null; + LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); + throw new IOException("Failed to create file: " + path, ex); } } @@ -119,12 +135,12 @@ public class SeaweedFileSystem extends FileSystem { */ @Override public FSDataOutputStream createNonRecursive(Path path, - FsPermission permission, - EnumSet<CreateFlag> flags, - int bufferSize, - short replication, - long blockSize, - Progressable progress) throws IOException { + FsPermission permission, + EnumSet<CreateFlag> flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { Path parent = path.getParent(); if (parent != null) { // expect this to raise an exception if there is no parent @@ -144,13 +160,15 @@ public class SeaweedFileSystem extends FileSystem { LOG.debug("append path: {} bufferSize:{}", path, bufferSize); path = qualify(path); + final Path finalPath = path; // For use in anonymous inner class try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); + SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path, + false, null, seaweedBufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { - LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); - return null; + LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex); + throw new IOException("Failed to append to file: " + path, ex); } } @@ -283,7 +301,6 @@ public class SeaweedFileSystem extends FileSystem { seaweedFileSystemStore.setOwner(path, owner, group); } - /** * Set permission of a path. * @@ -334,11 +351,11 @@ public class SeaweedFileSystem extends FileSystem { * @param f The path to the file to be truncated * @param newLength The size the file is to be truncated to * @return <code>true</code> if the file has been truncated to the desired - * <code>newLength</code> and is immediately available to be reused for - * write operations such as <code>append</code>, or - * <code>false</code> if a background process of adjusting the length of - * the last block has been started, and clients should wait for it to - * complete before proceeding with further file updates. + * <code>newLength</code> and is immediately available to be reused for + * write operations such as <code>append</code>, or + * <code>false</code> if a background process of adjusting the length of + * the last block has been started, and clients should wait for it to + * complete before proceeding with further file updates. * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported * (default). @@ -351,8 +368,7 @@ public class SeaweedFileSystem extends FileSystem { @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws - IOException { + final boolean createParent) throws IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( "Filesystem does not support symlinks!"); @@ -390,7 +406,7 @@ public class SeaweedFileSystem extends FileSystem { */ @Override public void renameSnapshot(Path path, String snapshotOldName, - String snapshotNewName) throws IOException { + String snapshotNewName) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() + " doesn't support renameSnapshot"); } @@ -412,10 +428,10 @@ public class SeaweedFileSystem extends FileSystem { } /** - * Modifies ACL entries of files and directories. This method can add new ACL - * entries or modify the permissions on existing ACL entries. All existing + * Modifies ACL entries of files and directories. This method can add new ACL + * entries or modify the permissions on existing ACL entries. All existing * ACL entries that are not specified in this call are retained without - * changes. (Modifications are merged into the current ACL.) + * changes. (Modifications are merged into the current ACL.) * * @param path Path to modify * @param aclSpec List<AclEntry> describing modifications @@ -431,7 +447,7 @@ public class SeaweedFileSystem extends FileSystem { } /** - * Removes ACL entries from files and directories. Other ACL entries are + * Removes ACL entries from files and directories. Other ACL entries are * retained. * * @param path Path to modify @@ -463,7 +479,7 @@ public class SeaweedFileSystem extends FileSystem { } /** - * Removes all but the base ACL entries of files and directories. The entries + * Removes all but the base ACL entries of files and directories. The entries * for user, group, and others are retained for compatibility with permission * bits. * @@ -485,7 +501,8 @@ public class SeaweedFileSystem extends FileSystem { * * @param path Path to modify * @param aclSpec List describing modifications, which must include entries - * for user, group, and others for compatibility with permission bits. + * for user, group, and others for compatibility with permission + * bits. * @throws IOException if an ACL could not be modified * @throws UnsupportedOperationException if the operation is unsupported * (default outcome). @@ -528,7 +545,7 @@ public class SeaweedFileSystem extends FileSystem { */ @Override public void setXAttr(Path path, String name, byte[] value, - EnumSet<XAttrSetFlag> flag) throws IOException { + EnumSet<XAttrSetFlag> flag) throws IOException { throw new UnsupportedOperationException(getClass().getSimpleName() + " doesn't support setXAttr"); } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index f65c1961b..c55d05797 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -59,19 +59,18 @@ public class SeaweedFileSystemStore { } public boolean createDirectory(final Path path, UserGroupInformation currentUser, - final FsPermission permission, final FsPermission umask) { + final FsPermission permission, final FsPermission umask) { LOG.debug("createDirectory path: {} permission: {} umask: {}", - path, - permission, - umask); + path, + permission, + umask); return filerClient.mkdirs( - path.toUri().getPath(), - permissionToMode(permission, true), - currentUser.getUserName(), - currentUser.getGroupNames() - ); + path.toUri().getPath(), + permissionToMode(permission, true), + currentUser.getUserName(), + currentUser.getGroupNames()); } public FileStatus[] listEntries(final Path path) throws IOException { @@ -84,7 +83,7 @@ public class SeaweedFileSystemStore { } if (!pathStatus.isDirectory()) { - return new FileStatus[]{pathStatus}; + return new FileStatus[] { pathStatus }; } List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); @@ -116,9 +115,9 @@ public class SeaweedFileSystemStore { public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) { LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", - path, - String.valueOf(isDirectory), - String.valueOf(recursive)); + path, + String.valueOf(isDirectory), + String.valueOf(recursive)); if (path.isRoot()) { return true; @@ -146,7 +145,7 @@ public class SeaweedFileSystemStore { String owner = attributes.getUserName(); String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : ""; return new FileStatus(length, isDir, block_replication, blocksize, - modification_time, access_time, permission, owner, group, null, path); + modification_time, access_time, permission, owner, group, null, path); } public FilerProto.Entry lookupEntry(Path path) { @@ -162,27 +161,29 @@ public class SeaweedFileSystemStore { if (source.isRoot()) { return; } - LOG.info("rename source: {} destination:{}", source, destination); + FilerProto.Entry entry = lookupEntry(source); if (entry == null) { LOG.warn("rename non-existing source: {}", source); return; } + filerClient.mv(source.toUri().getPath(), destination.toUri().getPath()); } public OutputStream createFile(final Path path, - final boolean overwrite, - FsPermission permission, - int bufferSize, - String replication) throws IOException { + final boolean overwrite, + FsPermission permission, + int bufferSize, + String replication) throws IOException { permission = permission == null ? FsPermission.getFileDefault() : permission; + LOG.debug("createFile path: {} overwrite: {} permission: {}", - path, - overwrite, - permission.toString()); + path, + overwrite, + permission.toString()); UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); long now = System.currentTimeMillis() / 1000L; @@ -203,20 +204,21 @@ public class SeaweedFileSystemStore { } if (entry == null) { entry = FilerProto.Entry.newBuilder() - .setName(path.getName()) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setFileMode(permissionToMode(permission, false)) - .setCrtime(now) - .setMtime(now) - .setUserName(userGroupInformation.getUserName()) - .clearGroupName() - .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) - ); + .setName(path.getName()) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(permissionToMode(permission, false)) + .setCrtime(now) + .setMtime(now) + .setUserName(userGroupInformation.getUserName()) + .clearGroupName() + .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))); SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } - return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); + + return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, + replication); } @@ -231,9 +233,9 @@ public class SeaweedFileSystemStore { } return new SeaweedHadoopInputStream(filerClient, - statistics, - path.toUri().getPath(), - entry); + statistics, + path.toUri().getPath(), + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java index f26eae597..8ac5a5ab4 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; import seaweedfs.client.FilerClient; @@ -11,12 +10,21 @@ import seaweedfs.client.SeaweedInputStream; import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; -public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable { +/** + * SeaweedFS Hadoop InputStream. + * + * NOTE: Does NOT implement ByteBufferReadable to match RawLocalFileSystem + * behavior. + * This ensures BufferedFSInputStream is used, which properly handles position + * tracking + * for positioned reads (critical for Parquet and other formats). + */ +public class SeaweedHadoopInputStream extends FSInputStream { private final SeaweedInputStream seaweedInputStream; private final Statistics statistics; + private final String path; public SeaweedHadoopInputStream( final FilerClient filerClient, @@ -25,6 +33,7 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe final FilerProto.Entry entry) throws IOException { this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); this.statistics = statistics; + this.path = path; } @Override @@ -37,20 +46,6 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe return seaweedInputStream.read(b, off, len); } - // implement ByteBufferReadable - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - int bytesRead = seaweedInputStream.read(buf); - - if (bytesRead > 0) { - if (statistics != null) { - statistics.incrementBytesRead(bytesRead); - } - } - - return bytesRead; - } - /** * Seek to given position in stream. * @@ -83,8 +78,10 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe } /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * Returns the length of the file that this stream refers to. Note that the + * length returned is the length + * as of the time the Stream was opened. Specifically, if there have been + * subsequent appends to the file, * they wont be reflected in the returned length. * * @return length of the file. @@ -104,8 +101,12 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe return seaweedInputStream.getPos(); } + public String getPath() { + return path; + } + /** - * Seeks a different copy of the data. Returns true if + * Seeks a different copy of the data. Returns true if * found a new source, false otherwise. * * @throws IOException throws {@link IOException} if there is an error @@ -139,7 +140,9 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe } /** - * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. + * gets whether mark and reset are supported by + * {@code SeaweedHadoopInputStream}. + * Always returns false. * * @return always {@code false} */ diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java index 1740312fe..a1a43820c 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -4,6 +4,8 @@ package seaweed.hdfs; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import seaweedfs.client.FilerClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedOutputStream; @@ -13,9 +15,12 @@ import java.util.Locale; public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { + private static final Logger LOG = LoggerFactory.getLogger(SeaweedHadoopOutputStream.class); + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { + final long position, final int bufferSize, final String replication) { super(filerClient, path, entry, position, bufferSize, replication); + } /** @@ -26,6 +31,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy */ @Override public void hsync() throws IOException { + if (supportFlush) { flushInternal(); } @@ -39,6 +45,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy */ @Override public void hflush() throws IOException { + if (supportFlush) { flushInternal(); } |
