aboutsummaryrefslogtreecommitdiff
path: root/other/java/hdfs3
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/hdfs3')
-rw-r--r--other/java/hdfs3/README.md9
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs3/pom.xml2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java85
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java74
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java45
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java9
7 files changed, 132 insertions, 94 deletions
diff --git a/other/java/hdfs3/README.md b/other/java/hdfs3/README.md
index f1afee264..e08f02a7c 100644
--- a/other/java/hdfs3/README.md
+++ b/other/java/hdfs3/README.md
@@ -130,6 +130,15 @@ The test suite covers:
<name>fs.seaweed.filer.port.grpc</name>
<value>18888</value>
</property>
+ <!-- Optional: Replication configuration with three priority levels:
+ 1) If set to non-empty value (e.g. "001") - uses that value
+ 2) If set to empty string "" - uses SeaweedFS filer's default replication
+ 3) If not configured (property not present) - uses HDFS replication parameter
+ -->
+ <!-- <property>
+ <name>fs.seaweed.replication</name>
+ <value>001</value>
+ </property> -->
</configuration>
```
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index d3c2751a5..c6579c3fb 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -572,7 +572,7 @@
</dependency>
</dependencies>
<properties>
- <seaweedfs.client.version>3.80</seaweedfs.client.version>
+ <seaweedfs.client.version>3.80.1-SNAPSHOT</seaweedfs.client.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index 061d4d700..824db8264 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>3.80</seaweedfs.client.version>
+ <seaweedfs.client.version>3.80.1-SNAPSHOT</seaweedfs.client.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 58fcaf975..513266d69 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -59,7 +59,7 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000);
+ int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port + 10000);
setConf(conf);
this.uri = uri;
@@ -85,29 +85,45 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
- return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
+
+ // Use BufferedFSInputStream for all streams (like RawLocalFileSystem)
+ // This ensures proper position tracking for positioned reads (critical for
+ // Parquet)
+ return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
- LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
- return null;
+ LOG.error("Failed to open file: {} bufferSize:{}", path, bufferSize, ex);
+ throw new IOException("Failed to open file: " + path, ex);
}
}
@Override
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress) throws IOException {
+ final short replication, final long blockSize, final Progressable progress) throws IOException {
LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
path = qualify(path);
+ final Path finalPath = path; // For use in anonymous inner class
try {
- String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1));
+ // Priority: 1) non-empty FS_SEAWEED_REPLICATION, 2) empty string -> filer
+ // default, 3) null -> HDFS replication
+ String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION);
+ if (replicaPlacement == null) {
+ // Not configured, use HDFS replication parameter. This creates a "00N"
+ // replication string,
+ // placing N (replication-1) extra replicas on different servers in the same
+ // rack.
+ replicaPlacement = String.format("%03d", replication - 1);
+ }
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path,
+ overwrite, permission,
+ seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
- LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
- return null;
+ LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
+ throw new IOException("Failed to create file: " + path, ex);
}
}
@@ -119,12 +135,12 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
- FsPermission permission,
- EnumSet<CreateFlag> flags,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress) throws IOException {
+ FsPermission permission,
+ EnumSet<CreateFlag> flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
@@ -144,13 +160,15 @@ public class SeaweedFileSystem extends FileSystem {
LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
path = qualify(path);
+ final Path finalPath = path; // For use in anonymous inner class
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
+ SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path,
+ false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
- LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
- return null;
+ LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex);
+ throw new IOException("Failed to append to file: " + path, ex);
}
}
@@ -283,7 +301,6 @@ public class SeaweedFileSystem extends FileSystem {
seaweedFileSystemStore.setOwner(path, owner, group);
}
-
/**
* Set permission of a path.
*
@@ -334,11 +351,11 @@ public class SeaweedFileSystem extends FileSystem {
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
* @return <code>true</code> if the file has been truncated to the desired
- * <code>newLength</code> and is immediately available to be reused for
- * write operations such as <code>append</code>, or
- * <code>false</code> if a background process of adjusting the length of
- * the last block has been started, and clients should wait for it to
- * complete before proceeding with further file updates.
+ * <code>newLength</code> and is immediately available to be reused for
+ * write operations such as <code>append</code>, or
+ * <code>false</code> if a background process of adjusting the length of
+ * the last block has been started, and clients should wait for it to
+ * complete before proceeding with further file updates.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
@@ -351,8 +368,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws
- IOException {
+ final boolean createParent) throws IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
@@ -390,7 +406,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
- String snapshotNewName) throws IOException {
+ String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support renameSnapshot");
}
@@ -412,10 +428,10 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
- * Modifies ACL entries of files and directories. This method can add new ACL
- * entries or modify the permissions on existing ACL entries. All existing
+ * Modifies ACL entries of files and directories. This method can add new ACL
+ * entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
- * changes. (Modifications are merged into the current ACL.)
+ * changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
@@ -431,7 +447,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
- * Removes ACL entries from files and directories. Other ACL entries are
+ * Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
@@ -463,7 +479,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
- * Removes all but the base ACL entries of files and directories. The entries
+ * Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
@@ -485,7 +501,8 @@ public class SeaweedFileSystem extends FileSystem {
*
* @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
- * for user, group, and others for compatibility with permission bits.
+ * for user, group, and others for compatibility with permission
+ * bits.
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
@@ -528,7 +545,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
- EnumSet<XAttrSetFlag> flag) throws IOException {
+ EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setXAttr");
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index f65c1961b..c55d05797 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -59,19 +59,18 @@ public class SeaweedFileSystemStore {
}
public boolean createDirectory(final Path path, UserGroupInformation currentUser,
- final FsPermission permission, final FsPermission umask) {
+ final FsPermission permission, final FsPermission umask) {
LOG.debug("createDirectory path: {} permission: {} umask: {}",
- path,
- permission,
- umask);
+ path,
+ permission,
+ umask);
return filerClient.mkdirs(
- path.toUri().getPath(),
- permissionToMode(permission, true),
- currentUser.getUserName(),
- currentUser.getGroupNames()
- );
+ path.toUri().getPath(),
+ permissionToMode(permission, true),
+ currentUser.getUserName(),
+ currentUser.getGroupNames());
}
public FileStatus[] listEntries(final Path path) throws IOException {
@@ -84,7 +83,7 @@ public class SeaweedFileSystemStore {
}
if (!pathStatus.isDirectory()) {
- return new FileStatus[]{pathStatus};
+ return new FileStatus[] { pathStatus };
}
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
@@ -116,9 +115,9 @@ public class SeaweedFileSystemStore {
public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
- path,
- String.valueOf(isDirectory),
- String.valueOf(recursive));
+ path,
+ String.valueOf(isDirectory),
+ String.valueOf(recursive));
if (path.isRoot()) {
return true;
@@ -146,7 +145,7 @@ public class SeaweedFileSystemStore {
String owner = attributes.getUserName();
String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
return new FileStatus(length, isDir, block_replication, blocksize,
- modification_time, access_time, permission, owner, group, null, path);
+ modification_time, access_time, permission, owner, group, null, path);
}
public FilerProto.Entry lookupEntry(Path path) {
@@ -162,27 +161,29 @@ public class SeaweedFileSystemStore {
if (source.isRoot()) {
return;
}
- LOG.info("rename source: {} destination:{}", source, destination);
+
FilerProto.Entry entry = lookupEntry(source);
if (entry == null) {
LOG.warn("rename non-existing source: {}", source);
return;
}
+
filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
}
public OutputStream createFile(final Path path,
- final boolean overwrite,
- FsPermission permission,
- int bufferSize,
- String replication) throws IOException {
+ final boolean overwrite,
+ FsPermission permission,
+ int bufferSize,
+ String replication) throws IOException {
permission = permission == null ? FsPermission.getFileDefault() : permission;
+
LOG.debug("createFile path: {} overwrite: {} permission: {}",
- path,
- overwrite,
- permission.toString());
+ path,
+ overwrite,
+ permission.toString());
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
long now = System.currentTimeMillis() / 1000L;
@@ -203,20 +204,21 @@ public class SeaweedFileSystemStore {
}
if (entry == null) {
entry = FilerProto.Entry.newBuilder()
- .setName(path.getName())
- .setIsDirectory(false)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setFileMode(permissionToMode(permission, false))
- .setCrtime(now)
- .setMtime(now)
- .setUserName(userGroupInformation.getUserName())
- .clearGroupName()
- .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
- );
+ .setName(path.getName())
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(permissionToMode(permission, false))
+ .setCrtime(now)
+ .setMtime(now)
+ .setUserName(userGroupInformation.getUserName())
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())));
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
}
- return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
+
+ return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize,
+ replication);
}
@@ -231,9 +233,9 @@ public class SeaweedFileSystemStore {
}
return new SeaweedHadoopInputStream(filerClient,
- statistics,
- path.toUri().getPath(),
- entry);
+ statistics,
+ path.toUri().getPath(),
+ entry);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
index f26eae597..8ac5a5ab4 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import seaweedfs.client.FilerClient;
@@ -11,12 +10,21 @@ import seaweedfs.client.SeaweedInputStream;
import java.io.EOFException;
import java.io.IOException;
-import java.nio.ByteBuffer;
-public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+/**
+ * SeaweedFS Hadoop InputStream.
+ *
+ * NOTE: Does NOT implement ByteBufferReadable to match RawLocalFileSystem
+ * behavior.
+ * This ensures BufferedFSInputStream is used, which properly handles position
+ * tracking
+ * for positioned reads (critical for Parquet and other formats).
+ */
+public class SeaweedHadoopInputStream extends FSInputStream {
private final SeaweedInputStream seaweedInputStream;
private final Statistics statistics;
+ private final String path;
public SeaweedHadoopInputStream(
final FilerClient filerClient,
@@ -25,6 +33,7 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
final FilerProto.Entry entry) throws IOException {
this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
this.statistics = statistics;
+ this.path = path;
}
@Override
@@ -37,20 +46,6 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
return seaweedInputStream.read(b, off, len);
}
- // implement ByteBufferReadable
- @Override
- public synchronized int read(ByteBuffer buf) throws IOException {
- int bytesRead = seaweedInputStream.read(buf);
-
- if (bytesRead > 0) {
- if (statistics != null) {
- statistics.incrementBytesRead(bytesRead);
- }
- }
-
- return bytesRead;
- }
-
/**
* Seek to given position in stream.
*
@@ -83,8 +78,10 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
}
/**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * Returns the length of the file that this stream refers to. Note that the
+ * length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been
+ * subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
@@ -104,8 +101,12 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
return seaweedInputStream.getPos();
}
+ public String getPath() {
+ return path;
+ }
+
/**
- * Seeks a different copy of the data. Returns true if
+ * Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*
* @throws IOException throws {@link IOException} if there is an error
@@ -139,7 +140,9 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
}
/**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ * gets whether mark and reset are supported by
+ * {@code SeaweedHadoopInputStream}.
+ * Always returns false.
*
* @return always {@code false}
*/
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
index 1740312fe..a1a43820c 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -4,6 +4,8 @@ package seaweed.hdfs;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedOutputStream;
@@ -13,9 +15,12 @@ import java.util.Locale;
public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedHadoopOutputStream.class);
+
public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
+ final long position, final int bufferSize, final String replication) {
super(filerClient, path, entry, position, bufferSize, replication);
+
}
/**
@@ -26,6 +31,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy
*/
@Override
public void hsync() throws IOException {
+
if (supportFlush) {
flushInternal();
}
@@ -39,6 +45,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy
*/
@Override
public void hflush() throws IOException {
+
if (supportFlush) {
flushInternal();
}