aboutsummaryrefslogtreecommitdiff
path: root/other
diff options
context:
space:
mode:
Diffstat (limited to 'other')
-rw-r--r--other/java/client/pom.xml4
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java93
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java33
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java75
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java104
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java303
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java302
-rw-r--r--other/java/examples/pom.xml4
-rw-r--r--other/java/hdfs-over-ftp/pom.xml120
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java14
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java27
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java71
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java98
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java71
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java43
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java102
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java333
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java14
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java104
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java72
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java239
-rw-r--r--other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java453
-rw-r--r--other/java/hdfs-over-ftp/src/main/resources/application.yml15
-rw-r--r--other/java/hdfs-over-ftp/src/main/resources/assembly.xml39
-rw-r--r--other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml40
-rw-r--r--other/java/hdfs-over-ftp/users.properties12
-rw-r--r--other/java/hdfs2/README.md190
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml333
-rw-r--r--other/java/hdfs2/pom.xml195
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java25
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java35
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java634
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java291
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java150
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java16
-rw-r--r--other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java90
-rw-r--r--other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java379
-rw-r--r--other/java/hdfs3/README.md9
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs3/pom.xml2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java85
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java74
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java45
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java9
45 files changed, 866 insertions, 4524 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 682582f7b..1989b9b05 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.seaweedfs</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>3.80</version>
+ <version>3.80.1-SNAPSHOT</version>
<name>SeaweedFS Java Client</name>
<description>A java client for SeaweedFS.</description>
@@ -33,7 +33,7 @@
<properties>
<protobuf.version>3.25.5</protobuf.version>
<!-- follow https://github.com/grpc/grpc-java -->
- <grpc.version>1.75.0</grpc.version>
+ <grpc.version>1.77.0</grpc.version>
<guava.version>32.0.0-jre</guava.version>
</properties>
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 44977d186..320a754ea 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -8,10 +8,13 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class FilerGrpcClient {
@@ -29,10 +32,12 @@ public class FilerGrpcClient {
public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
protected int randomClientId;
- private final ManagedChannel channel;
- private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
- private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
- private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
+
+ // Connection pool to handle concurrent requests
+ private static final int CHANNEL_POOL_SIZE = 4;
+ private final List<ManagedChannel> channelPool;
+ private final AtomicInteger channelIndex = new AtomicInteger(0);
+
private boolean cipher = false;
private String collection = "";
private String replication = "";
@@ -45,26 +50,18 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int port, int grpcPort, String cn, SslContext sslContext) {
- this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort)
- .usePlaintext()
- .maxInboundMessageSize(1024 * 1024 * 1024) :
- cn.isEmpty() ?
- NettyChannelBuilder.forAddress(host, grpcPort)
- .maxInboundMessageSize(1024 * 1024 * 1024)
- .negotiationType(NegotiationType.TLS)
- .sslContext(sslContext) :
- NettyChannelBuilder.forAddress(host, grpcPort)
- .maxInboundMessageSize(1024 * 1024 * 1024)
- .negotiationType(NegotiationType.TLS)
- .overrideAuthority(cn) //will not check hostname of the filer server
- .sslContext(sslContext)
- );
-
filerAddress = SeaweedUtil.joinHostPort(host, port);
- FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
- this.getBlockingStub().getFilerConfiguration(
+ // Create a pool of channels for better concurrency handling
+ channelPool = new ArrayList<>(CHANNEL_POOL_SIZE);
+
+ for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
+ channelPool.add(createChannelBuilder(host, grpcPort, sslContext, cn).build());
+ }
+
+ // Get filer configuration using first channel
+ FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc
+ .newBlockingStub(channelPool.get(0)).getFilerConfiguration(
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
cipher = filerConfigurationResponse.getCipher();
collection = filerConfigurationResponse.getCollection();
@@ -73,11 +70,39 @@ public class FilerGrpcClient {
}
- private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
- channel = channelBuilder.build();
- blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
- asyncStub = SeaweedFilerGrpc.newStub(channel);
- futureStub = SeaweedFilerGrpc.newFutureStub(channel);
+ /**
+ * Creates a NettyChannelBuilder with common gRPC configuration.
+ * Supports plaintext and TLS modes with optional authority override.
+ */
+ private NettyChannelBuilder createChannelBuilder(String host, int grpcPort, SslContext sslContext, String cn) {
+ NettyChannelBuilder builder = NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
+ .maxInboundMetadataSize(1024 * 1024)
+ .flowControlWindow(16 * 1024 * 1024)
+ .initialFlowControlWindow(16 * 1024 * 1024)
+ .maxHeaderListSize(16 * 1024 * 1024)
+ .keepAliveTime(30, TimeUnit.SECONDS)
+ .keepAliveTimeout(10, TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true)
+ .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024)
+ .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024);
+
+ if (sslContext == null) {
+ builder.usePlaintext();
+ } else {
+ builder.negotiationType(NegotiationType.TLS).sslContext(sslContext);
+ if (!cn.isEmpty()) {
+ builder.overrideAuthority(cn);
+ }
+ }
+ return builder;
+ }
+
+ // Get a channel from the pool using round-robin
+ private ManagedChannel getChannel() {
+ int raw = channelIndex.getAndIncrement();
+ int index = Math.floorMod(raw, CHANNEL_POOL_SIZE);
+ return channelPool.get(index);
}
public boolean isCipher() {
@@ -93,19 +118,25 @@ public class FilerGrpcClient {
}
public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ for (ManagedChannel channel : channelPool) {
+ channel.shutdown();
+ }
+ for (ManagedChannel channel : channelPool) {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ }
}
public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() {
- return blockingStub;
+ // Return a new stub using a channel from the pool (round-robin)
+ return SeaweedFilerGrpc.newBlockingStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() {
- return asyncStub;
+ return SeaweedFilerGrpc.newStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
- return futureStub;
+ return SeaweedFilerGrpc.newFutureStub(getChannel());
}
public void setAccessVolumeServerDirectly() {
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
index 64754321b..48a508db0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -23,7 +23,7 @@ public class SeaweedInputStream extends InputStream {
private final long contentLength;
private FilerProto.Entry entry;
- private long position = 0; // cursor of the file
+ private long position = 0; // cursor of the file
private boolean closed = false;
@@ -44,7 +44,6 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
-
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -64,7 +63,6 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
-
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -99,7 +97,8 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (len > (b.length - off)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ throw new IllegalArgumentException(
+ "requested read length is more than will fit after requested offset in buffer");
}
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
@@ -114,21 +113,30 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
+ return -1; // Hadoop prefers -1 to EOFException
}
long bytesRead = 0;
int len = buf.remaining();
- if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) {
- entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf);
+ if (this.position < Integer.MAX_VALUE && (this.position + len) <= entry.getContent().size()) {
+ entry.getContent().substring((int) this.position, (int) (this.position + len)).copyTo(buf);
+ bytesRead = len; // FIX: Update bytesRead after inline copy
} else {
- bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
+ // Use the known contentLength instead of recomputing from the entry to avoid
+ // races
+ bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf,
+ this.contentLength);
}
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
+ // Clamp premature EOFs: do not return -1 unless position >= contentLength
+ if (bytesRead < 0 && position < contentLength) {
+ bytesRead = 0;
+ }
+
if (bytesRead > 0) {
this.position += bytesRead;
}
@@ -188,12 +196,15 @@ public class SeaweedInputStream extends InputStream {
}
final long remaining = this.contentLength - this.position;
return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
+ ? (int) remaining
+ : Integer.MAX_VALUE;
}
/**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * Returns the length of the file that this stream refers to. Note that the
+ * length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been
+ * subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index 68c281992..ea4c99805 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -23,15 +23,13 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
- private final boolean shouldSaveMetadata = false;
private FilerProto.Entry.Builder entry;
- private long position;
+ private long position; // Flushed bytes (committed to service)
private boolean closed;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private ByteBuffer buffer;
- private long outputIndex;
private String replication = "";
private String collection = "";
@@ -44,7 +42,8 @@ public class SeaweedOutputStream extends OutputStream {
}
public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
+ final long position, final int bufferSize, final String replication) {
+
this.filerClient = filerClient;
this.replication = replication;
this.path = path;
@@ -58,8 +57,7 @@ public class SeaweedOutputStream extends OutputStream {
this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
120L,
TimeUnit.SECONDS,
@@ -77,8 +75,7 @@ public class SeaweedOutputStream extends OutputStream {
.setFileMode(0755)
.setCrtime(now)
.setMtime(now)
- .clearGroupName()
- );
+ .clearGroupName());
}
}
@@ -86,14 +83,35 @@ public class SeaweedOutputStream extends OutputStream {
public void setReplication(String replication) {
this.replication = replication;
}
+
public void setCollection(String collection) {
this.collection = collection;
}
+ /**
+ * Get the current position in the output stream.
+ * This returns the total position including both flushed and buffered data.
+ *
+ * @return current position (flushed + buffered bytes)
+ */
+ public synchronized long getPos() throws IOException {
+ // Guard against NPE if called after close()
+ if (buffer == null) {
+ return position;
+ }
+
+ // Return current position (flushed + buffered)
+ return position + buffer.position();
+ }
+
public static String getParentDirectory(String path) {
int protoIndex = path.indexOf("://");
if (protoIndex >= 0) {
- int pathStart = path.indexOf("/", protoIndex+3);
+ int pathStart = path.indexOf("/", protoIndex + 3);
+ if (pathStart < 0) {
+ // No path segment; treat as root (e.g., "seaweedfs://host")
+ return "/";
+ }
path = path.substring(pathStart);
}
if (path.equals("/")) {
@@ -116,6 +134,13 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
+
+ // Set the file size in attributes based on our position
+ // This ensures Parquet footer metadata matches what we actually wrote
+ FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder();
+ attrBuilder.setFileSize(offset);
+ entry.setAttributes(attrBuilder);
+
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) {
throw new IOException(ex);
@@ -125,7 +150,7 @@ public class SeaweedOutputStream extends OutputStream {
@Override
public void write(final int byteVal) throws IOException {
- write(new byte[]{(byte) (byteVal & 0xFF)});
+ write(new byte[] { (byte) (byteVal & 0xFF) });
}
@Override
@@ -141,8 +166,6 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException();
}
- // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
-
int currentOffset = off;
int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
@@ -154,9 +177,11 @@ public class SeaweedOutputStream extends OutputStream {
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," +
+ // ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
currentOffset += writableBytes;
+
writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
writableBytes = bufferSize - buffer.position();
@@ -191,7 +216,6 @@ public class SeaweedOutputStream extends OutputStream {
return;
}
- LOG.debug("close path: {}", path);
try {
flushInternal();
threadExecutor.shutdown();
@@ -209,28 +233,35 @@ public class SeaweedOutputStream extends OutputStream {
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (buffer.position() == 0) {
+ int bufferPos = buffer.position();
+
+ if (bufferPos == 0) {
return;
}
- position += submitWriteBufferToService(buffer, position);
+ int written = submitWriteBufferToService(buffer, position);
+ position += written;
buffer = ByteBufferPool.request(bufferSize);
}
- private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
+ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition)
+ throws IOException {
- ((Buffer)bufferToWrite).flip();
+ ((Buffer) bufferToWrite).flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {
- // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
- // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ // System.out.println(path + " is going to save [" + (writePosition) + "," +
+ // ((writePosition) + bytesLength) + ")");
+ SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(),
+ bufferToWrite.position(), bufferToWrite.limit(), path);
+ // System.out.println(path + " saved [" + (writePosition) + "," +
+ // ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite);
return null;
});
@@ -318,12 +349,10 @@ public class SeaweedOutputStream extends OutputStream {
private static class WriteOperation {
private final Future<Void> task;
- private final long startOffset;
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
this.task = task;
- this.startOffset = startOffset;
this.length = length;
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index cac85d186..3fd184671 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -24,9 +24,10 @@ public class SeaweedRead {
// returns bytesRead
public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals,
- final long position, final ByteBuffer buf, final long fileSize) throws IOException {
+ final long position, final ByteBuffer buf, final long fileSize) throws IOException {
- List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
+ int originalRemaining = buf.remaining();
+ List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, originalRemaining);
Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
@@ -51,7 +52,7 @@ public class SeaweedRead {
}
}
- //TODO parallel this
+ // TODO parallel this
long readCount = 0;
long startOffset = position;
for (ChunkView chunkView : chunkViews) {
@@ -59,7 +60,7 @@ public class SeaweedRead {
if (startOffset < chunkView.logicOffset) {
long gap = chunkView.logicOffset - startOffset;
LOG.debug("zero [{},{})", startOffset, startOffset + gap);
- buf.position(buf.position()+ (int)gap);
+ buf.position(buf.position() + (int) gap);
readCount += gap;
startOffset += gap;
}
@@ -81,12 +82,17 @@ public class SeaweedRead {
}
- long limit = Math.min(buf.limit(), fileSize);
+ // Fix: Calculate the correct limit based on the read position and requested
+ // size,
+ // not the buffer's absolute limit. This fixes the 78-byte EOF error when
+ // seeking
+ // near the end of the file.
+ long limit = Math.min(position + originalRemaining, fileSize);
if (startOffset < limit) {
long gap = limit - startOffset;
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
- buf.position(buf.position()+ (int)gap);
+ buf.position(buf.position() + (int) gap);
readCount += gap;
startOffset += gap;
}
@@ -94,7 +100,8 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView,
+ FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@@ -105,13 +112,15 @@ public class SeaweedRead {
int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset);
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
- chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset,
+ chunkView.logicOffset + chunkView.size, startOffset);
buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
return len;
}
- public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView,
+ FilerProto.Locations locations) throws IOException {
byte[] data = null;
IOException lastException = null;
@@ -214,8 +223,7 @@ public class SeaweedRead {
chunkStart,
isFullChunk,
chunk.cipherKey,
- chunk.isCompressed
- ));
+ chunk.isCompressed));
}
}
return views;
@@ -239,7 +247,10 @@ public class SeaweedRead {
}
public static long fileSize(FilerProto.Entry entry) {
- return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ long chunksSize = totalSize(entry.getChunksList());
+ long attrSize = entry.getAttributes().getFileSize();
+ long finalSize = Math.max(chunksSize, attrSize);
+ return finalSize;
}
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
@@ -263,7 +274,8 @@ public class SeaweedRead {
public final byte[] cipherKey;
public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset,
+ boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
@@ -297,7 +309,8 @@ public class SeaweedRead {
public final byte[] cipherKey;
public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey,
+ boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 88c7cefbe..0fadd53cc 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -2,6 +2,7 @@ package seaweedfs.client;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
+import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
@@ -25,13 +26,13 @@ public class SeaweedWrite {
private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
- final String replication,
- String collection,
- final FilerClient filerClient,
- final long offset,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength,
- final String path) throws IOException {
+ final String replication,
+ String collection,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength,
+ final String path) throws IOException {
IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
@@ -60,21 +61,50 @@ public class SeaweedWrite {
}
public static FilerProto.FileChunk.Builder writeChunk(final String replication,
- final String collection,
- final FilerClient filerClient,
- final long offset,
- final byte[] bytes,
- final long bytesOffset,
- final long bytesLength,
- final String path) throws IOException {
- FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
- FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
- .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
- .setDataCenter("")
- .setTtlSec(0)
- .setPath(path)
- .build());
+ final String collection,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength,
+ final String path) throws IOException {
+
+ // Retry assignVolume call for transient network/server errors
+ FilerProto.AssignVolumeResponse response = null;
+ IOException lastException = null;
+ int maxRetries = 3;
+
+ for (int attempt = 0; attempt < maxRetries; attempt++) {
+ try {
+ response = filerClient.getBlockingStub().assignVolume(
+ FilerProto.AssignVolumeRequest.newBuilder()
+ .setCollection(
+ Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
+ .setReplication(
+ Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
+ .setDataCenter("")
+ .setTtlSec(0)
+ .setPath(path)
+ .build());
+ break; // Success, exit retry loop
+ } catch (io.grpc.StatusRuntimeException e) {
+ lastException = new IOException(
+ "assignVolume failed (attempt " + (attempt + 1) + "/" + maxRetries + "): " + e.getMessage(), e);
+ if (attempt < maxRetries - 1) {
+ try {
+ Thread.sleep(100 * (attempt + 1)); // Exponential backoff: 100ms, 200ms, 300ms
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted during retry", ie);
+ }
+ }
+ }
+ }
+
+ if (response == null) {
+ throw lastException != null ? lastException
+ : new IOException("assignVolume failed after " + maxRetries + " attempts");
+ }
if (!Strings.isNullOrEmpty(response.getError())) {
throw new IOException(response.getError());
@@ -83,7 +113,8 @@ public class SeaweedWrite {
String fileId = response.getFileId();
String auth = response.getAuth();
- String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl());
+ String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(),
+ response.getLocation().getPublicUrl());
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null;
@@ -94,8 +125,6 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
-
return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
@@ -106,27 +135,28 @@ public class SeaweedWrite {
}
public static void writeMeta(final FilerClient filerClient,
- final String parentDirectory,
- final FilerProto.Entry.Builder entry) throws IOException {
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
synchronized (entry) {
- List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(),
+ parentDirectory);
+
entry.clearChunks();
entry.addAllChunks(chunks);
filerClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
.setEntry(entry)
- .build()
- );
+ .build());
}
}
private static String multipartUpload(String targetUrl,
- String auth,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength,
- byte[] cipherKey) throws IOException {
+ String auth,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength,
+ byte[] cipherKey) throws IOException {
MessageDigest md = null;
try {
md = MessageDigest.getInstance("MD5");
@@ -162,8 +192,10 @@ public class SeaweedWrite {
try {
if (response.getStatusLine().getStatusCode() / 100 != 2) {
- if (response.getEntity().getContentType() != null && response.getEntity().getContentType().getValue().equals("application/json")) {
- throw new IOException(EntityUtils.toString(response.getEntity(), "UTF-8"));
+ HttpEntity entity = response.getEntity();
+ if (entity != null && entity.getContentType() != null
+ && entity.getContentType().getValue().equals("application/json")) {
+ throw new IOException(EntityUtils.toString(entity, "UTF-8"));
} else {
throw new IOException(response.getStatusLine().getReasonPhrase());
}
diff --git a/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java
new file mode 100644
index 000000000..d49e17e72
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java
@@ -0,0 +1,303 @@
+package seaweedfs.client;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test to reproduce the Parquet EOF issue.
+ *
+ * The issue: When Parquet writes column chunks, it calls getPos() to record
+ * offsets.
+ * If getPos() returns a position that doesn't include buffered (unflushed)
+ * data,
+ * the footer metadata will have incorrect offsets.
+ *
+ * This test simulates Parquet's behavior:
+ * 1. Write some data (column chunk 1)
+ * 2. Call getPos() - Parquet records this as the END of chunk 1
+ * 3. Write more data (column chunk 2)
+ * 4. Call getPos() - Parquet records this as the END of chunk 2
+ * 5. Close the file
+ * 6. Verify that the recorded positions match the actual file content
+ *
+ * Prerequisites:
+ * - SeaweedFS master, volume server, and filer must be running
+ * - Default ports: filer HTTP 8888, filer gRPC 18888
+ *
+ * To run:
+ * export SEAWEEDFS_TEST_ENABLED=true
+ * cd other/java/client
+ * mvn test -Dtest=GetPosBufferTest
+ */
+public class GetPosBufferTest {
+
+ private FilerClient filerClient;
+ private static final String TEST_ROOT = "/test-getpos-buffer";
+ private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+
+ @Before
+ public void setUp() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+
+ String filerHost = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
+ String filerGrpcPort = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888");
+
+ filerClient = new FilerClient(filerHost, Integer.parseInt(filerGrpcPort));
+
+ // Clean up any existing test directory
+ if (filerClient.exists(TEST_ROOT)) {
+ filerClient.rm(TEST_ROOT, true, true);
+ }
+
+ // Create test root directory
+ filerClient.mkdirs(TEST_ROOT, 0755);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ if (filerClient != null) {
+ filerClient.rm(TEST_ROOT, true, true);
+ filerClient.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetPosWithBufferedData() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with buffered data ===");
+
+ String testPath = TEST_ROOT + "/getpos-test.bin";
+
+ // Simulate what Parquet does when writing column chunks
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write "column chunk 1" - 100 bytes
+ byte[] chunk1 = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ chunk1[i] = (byte) i;
+ }
+ outputStream.write(chunk1);
+
+ // Parquet calls getPos() here to record end of chunk 1
+ long posAfterChunk1 = outputStream.getPos();
+ System.out.println("Position after chunk 1 (100 bytes): " + posAfterChunk1);
+ assertEquals("getPos() should return 100 after writing 100 bytes", 100, posAfterChunk1);
+
+ // Write "column chunk 2" - 200 bytes
+ byte[] chunk2 = new byte[200];
+ for (int i = 0; i < 200; i++) {
+ chunk2[i] = (byte) (i + 100);
+ }
+ outputStream.write(chunk2);
+
+ // Parquet calls getPos() here to record end of chunk 2
+ long posAfterChunk2 = outputStream.getPos();
+ System.out.println("Position after chunk 2 (200 more bytes): " + posAfterChunk2);
+ assertEquals("getPos() should return 300 after writing 300 bytes total", 300, posAfterChunk2);
+
+ // Write "column chunk 3" - small chunk of 78 bytes (the problematic size!)
+ byte[] chunk3 = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ chunk3[i] = (byte) (i + 50);
+ }
+ outputStream.write(chunk3);
+
+ // Parquet calls getPos() here to record end of chunk 3
+ long posAfterChunk3 = outputStream.getPos();
+ System.out.println("Position after chunk 3 (78 more bytes): " + posAfterChunk3);
+ assertEquals("getPos() should return 378 after writing 378 bytes total", 378, posAfterChunk3);
+
+ // Close to flush everything
+ outputStream.close();
+ System.out.println("File closed successfully");
+
+ // Now read the file and verify its actual size matches what getPos() reported
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
+ long actualFileSize = SeaweedRead.fileSize(entry);
+ System.out.println("Actual file size on disk: " + actualFileSize);
+
+ assertEquals("File size should match the last getPos() value", 378, actualFileSize);
+
+ // Now read the file and verify we can read all the data
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ byte[] readBuffer = new byte[500]; // Larger buffer to read everything
+ int totalRead = 0;
+ int bytesRead;
+ while ((bytesRead = inputStream.read(readBuffer, totalRead, readBuffer.length - totalRead)) > 0) {
+ totalRead += bytesRead;
+ }
+ inputStream.close();
+
+ System.out.println("Total bytes read: " + totalRead);
+ assertEquals("Should read exactly 378 bytes", 378, totalRead);
+
+ // Verify the data is correct
+ for (int i = 0; i < 100; i++) {
+ assertEquals("Chunk 1 data mismatch at byte " + i, (byte) i, readBuffer[i]);
+ }
+ for (int i = 0; i < 200; i++) {
+ assertEquals("Chunk 2 data mismatch at byte " + (100 + i), (byte) (i + 100), readBuffer[100 + i]);
+ }
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Chunk 3 data mismatch at byte " + (300 + i), (byte) (i + 50), readBuffer[300 + i]);
+ }
+
+ System.out.println("SUCCESS: All data verified correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithSmallWrites() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with many small writes (Parquet pattern) ===");
+
+ String testPath = TEST_ROOT + "/small-writes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Parquet writes column data in small chunks and frequently calls getPos()
+ String[] columnData = { "Alice", "Bob", "Charlie", "David" };
+ long[] recordedPositions = new long[columnData.length];
+
+ for (int i = 0; i < columnData.length; i++) {
+ byte[] data = columnData[i].getBytes(StandardCharsets.UTF_8);
+ outputStream.write(data);
+
+ // Parquet calls getPos() after each value to track offsets
+ recordedPositions[i] = outputStream.getPos();
+ System.out.println("After writing '" + columnData[i] + "': pos=" + recordedPositions[i]);
+ }
+
+ long finalPos = outputStream.getPos();
+ System.out.println("Final position before close: " + finalPos);
+
+ outputStream.close();
+
+ // Verify file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size should match final getPos()", finalPos, actualFileSize);
+
+ // Verify we can read using the recorded positions
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ long currentPos = 0;
+ for (int i = 0; i < columnData.length; i++) {
+ long nextPos = recordedPositions[i];
+ int length = (int) (nextPos - currentPos);
+
+ byte[] buffer = new byte[length];
+ int bytesRead = inputStream.read(buffer, 0, length);
+
+ assertEquals("Should read " + length + " bytes for '" + columnData[i] + "'", length, bytesRead);
+
+ String readData = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
+ System.out.println("Read at offset " + currentPos + ": '" + readData + "'");
+ assertEquals("Data mismatch", columnData[i], readData);
+
+ currentPos = nextPos;
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: Small writes with getPos() tracking work correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithExactly78BytesBuffered() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with EXACTLY 78 bytes buffered (the bug size!) ===");
+
+ String testPath = TEST_ROOT + "/78-bytes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write some initial data
+ byte[] initial = new byte[1000];
+ for (int i = 0; i < 1000; i++) {
+ initial[i] = (byte) i;
+ }
+ outputStream.write(initial);
+ outputStream.flush(); // Ensure this is flushed
+
+ long posAfterFlush = outputStream.getPos();
+ System.out.println("Position after 1000 bytes + flush: " + posAfterFlush);
+ assertEquals("Should be at position 1000 after flush", 1000, posAfterFlush);
+
+ // Now write EXACTLY 78 bytes (the problematic buffer size in our bug)
+ byte[] problematicChunk = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ problematicChunk[i] = (byte) (i + 50);
+ }
+ outputStream.write(problematicChunk);
+
+ // DO NOT FLUSH - this is the bug scenario!
+ // Parquet calls getPos() here while the 78 bytes are still buffered
+ long posWithBufferedData = outputStream.getPos();
+ System.out.println("Position with 78 bytes BUFFERED (not flushed): " + posWithBufferedData);
+
+ // This MUST return 1078, not 1000!
+ assertEquals("getPos() MUST include buffered data", 1078, posWithBufferedData);
+
+ // Now close (which will flush)
+ outputStream.close();
+
+ // Verify actual file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size must be 1078", 1078, actualFileSize);
+
+ // Try to read at position 1000 for 78 bytes (what Parquet would try)
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1000);
+
+ byte[] readBuffer = new byte[78];
+ int bytesRead = inputStream.read(readBuffer, 0, 78);
+
+ System.out.println("Bytes read at position 1000: " + bytesRead);
+ assertEquals("Should successfully read 78 bytes at position 1000", 78, bytesRead);
+
+ // Verify the data matches
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Data mismatch at byte " + i, problematicChunk[i], readBuffer[i]);
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: getPos() correctly includes buffered data!\n");
+ }
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
index f384e059f..3cfb2ce9e 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
@@ -28,22 +28,21 @@ public class SeaweedStreamIntegrationTest {
private FilerClient filerClient;
private static final String TEST_ROOT = "/test-stream-integration";
- private static final boolean TESTS_ENABLED =
- "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+ private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
@Before
public void setUp() throws Exception {
if (!TESTS_ENABLED) {
return;
}
-
+
filerClient = new FilerClient("localhost", 18888);
-
+
// Clean up any existing test directory
if (filerClient.exists(TEST_ROOT)) {
filerClient.rm(TEST_ROOT, true, true);
}
-
+
// Create test root directory
filerClient.mkdirs(TEST_ROOT, 0755);
}
@@ -53,7 +52,7 @@ public class SeaweedStreamIntegrationTest {
if (!TESTS_ENABLED || filerClient == null) {
return;
}
-
+
try {
// Clean up test directory
if (filerClient.exists(TEST_ROOT)) {
@@ -70,30 +69,29 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/small.txt";
String testContent = "Hello, SeaweedFS!";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[testContent.length()];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read all bytes", testContent.length(), bytesRead);
assertEquals("Content should match", testContent, new String(buffer, StandardCharsets.UTF_8));
}
@@ -104,43 +102,42 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/large.bin";
int fileSize = 10 * 1024 * 1024; // 10 MB
-
+
// Generate random data
byte[] originalData = new byte[fileSize];
new Random(42).nextBytes(originalData); // Use seed for reproducibility
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(originalData);
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
// Read file in chunks to handle large files properly
byte[] readData = new byte[fileSize];
int totalRead = 0;
int bytesRead;
byte[] buffer = new byte[8192]; // Read in 8KB chunks
-
+
while ((bytesRead = inputStream.read(buffer)) > 0) {
System.arraycopy(buffer, 0, readData, totalRead, bytesRead);
totalRead += bytesRead;
}
inputStream.close();
-
+
assertEquals("Should read all bytes", fileSize, totalRead);
assertArrayEquals("Content should match", originalData, readData);
}
@@ -151,31 +148,30 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/chunked.txt";
- String[] chunks = {"First chunk. ", "Second chunk. ", "Third chunk."};
-
+ String[] chunks = { "First chunk. ", "Second chunk. ", "Third chunk." };
+
// Write file in chunks
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
for (String chunk : chunks) {
outputStream.write(chunk.getBytes(StandardCharsets.UTF_8));
}
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
String expected = String.join("", chunks);
String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
-
+
assertEquals("Content should match", expected, actual);
}
@@ -185,31 +181,30 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/offset.txt";
String testContent = "0123456789ABCDEFGHIJ";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read with offset
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
inputStream.seek(10); // Skip first 10 bytes
-
+
byte[] buffer = new byte[10];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 10 bytes", 10, bytesRead);
- assertEquals("Should read from offset", "ABCDEFGHIJ",
- new String(buffer, StandardCharsets.UTF_8));
+ assertEquals("Should read from offset", "ABCDEFGHIJ",
+ new String(buffer, StandardCharsets.UTF_8));
}
@Test
@@ -218,32 +213,31 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/partial.txt";
String testContent = "The quick brown fox jumps over the lazy dog";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read partial
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
// Read only "quick brown"
inputStream.seek(4);
byte[] buffer = new byte[11];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 11 bytes", 11, bytesRead);
- assertEquals("Should read partial content", "quick brown",
- new String(buffer, StandardCharsets.UTF_8));
+ assertEquals("Should read partial content", "quick brown",
+ new String(buffer, StandardCharsets.UTF_8));
}
@Test
@@ -252,28 +246,27 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/empty.txt";
-
+
// Write empty file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read empty file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[100];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 0 bytes from empty file", -1, bytesRead);
}
@@ -283,32 +276,31 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/overwrite.txt";
String originalContent = "Original content";
String newContent = "New content that overwrites the original";
-
+
// Write original file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(originalContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Overwrite file
outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(newContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
assertEquals("Should have new content", newContent, actual);
}
@@ -319,23 +311,22 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/multireads.txt";
String testContent = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read in multiple small chunks
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
StringBuilder result = new StringBuilder();
byte[] buffer = new byte[5];
int bytesRead;
@@ -343,7 +334,7 @@ public class SeaweedStreamIntegrationTest {
result.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
}
inputStream.close();
-
+
assertEquals("Should read entire content", testContent, result.toString());
}
@@ -353,29 +344,28 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/binary.bin";
byte[] binaryData = new byte[256];
for (int i = 0; i < 256; i++) {
binaryData[i] = (byte) i;
}
-
+
// Write binary file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(binaryData);
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] readData = new byte[256];
int bytesRead = inputStream.read(readData);
inputStream.close();
-
+
assertEquals("Should read all bytes", 256, bytesRead);
assertArrayEquals("Binary data should match", binaryData, readData);
}
@@ -386,32 +376,132 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/flush.txt";
String testContent = "Content to flush";
-
+
// Write file with flush
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.flush(); // Explicitly flush
outputStream.close();
-
+
// Verify file was written
assertTrue("File should exist after flush", filerClient.exists(testPath));
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[testContent.length()];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
- assertEquals("Content should match", testContent,
- new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
+
+ assertEquals("Content should match", testContent,
+ new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
}
-}
+ /**
+ * Tests range reads similar to how Parquet reads column chunks.
+ * This simulates:
+ * 1. Seeking to specific offsets
+ * 2. Reading specific byte ranges
+ * 3. Verifying each read() call returns the correct number of bytes
+ *
+ * This test specifically addresses the bug where read() was returning 0
+ * for inline content or -1 prematurely for chunked reads.
+ */
+ @Test
+ public void testRangeReads() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ String testPath = TEST_ROOT + "/rangereads.dat";
+
+ // Create a 1275-byte file (similar to the Parquet file size that was failing)
+ byte[] testData = new byte[1275];
+ Random random = new Random(42); // Fixed seed for reproducibility
+ random.nextBytes(testData);
+
+ // Write file
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+ outputStream.write(testData);
+ outputStream.close();
+
+ // Read file entry
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
+ // Test 1: Read last 8 bytes (like reading Parquet footer length)
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1267);
+ byte[] buffer = new byte[8];
+ int bytesRead = inputStream.read(buffer, 0, 8);
+ assertEquals("Should read 8 bytes at offset 1267", 8, bytesRead);
+ assertArrayEquals("Content at offset 1267 should match",
+ Arrays.copyOfRange(testData, 1267, 1275), buffer);
+ inputStream.close();
+
+ // Test 2: Read large chunk in middle (like reading column data)
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(383);
+ buffer = new byte[884]; // Read bytes 383-1267
+ bytesRead = inputStream.read(buffer, 0, 884);
+ assertEquals("Should read 884 bytes at offset 383", 884, bytesRead);
+ assertArrayEquals("Content at offset 383 should match",
+ Arrays.copyOfRange(testData, 383, 1267), buffer);
+ inputStream.close();
+
+ // Test 3: Read from beginning (like reading Parquet magic bytes)
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ buffer = new byte[4];
+ bytesRead = inputStream.read(buffer, 0, 4);
+ assertEquals("Should read 4 bytes at offset 0", 4, bytesRead);
+ assertArrayEquals("Content at offset 0 should match",
+ Arrays.copyOfRange(testData, 0, 4), buffer);
+ inputStream.close();
+
+ // Test 4: Multiple sequential reads without seeking (like
+ // H2SeekableInputStream.readFully)
+ // This is the critical test case that was failing!
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1197); // Position where EOF was being returned prematurely
+
+ byte[] fullBuffer = new byte[78]; // Try to read the "missing" 78 bytes
+ int totalRead = 0;
+ int offset = 0;
+ int remaining = 78;
+
+ // Simulate Parquet's H2SeekableInputStream.readFully() loop
+ while (remaining > 0) {
+ int read = inputStream.read(fullBuffer, offset, remaining);
+ if (read == -1) {
+ fail(String.format(
+ "Got EOF after reading %d bytes, but expected to read %d more bytes (total requested: 78)",
+ totalRead, remaining));
+ }
+ assertTrue("Each read() should return positive bytes", read > 0);
+ totalRead += read;
+ offset += read;
+ remaining -= read;
+ }
+
+ assertEquals("Should read all 78 bytes in readFully loop", 78, totalRead);
+ assertArrayEquals("Content at offset 1197 should match",
+ Arrays.copyOfRange(testData, 1197, 1275), fullBuffer);
+ inputStream.close();
+
+ // Test 5: Read entire file in one go
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ byte[] allData = new byte[1275];
+ bytesRead = inputStream.read(allData, 0, 1275);
+ assertEquals("Should read entire 1275 bytes", 1275, bytesRead);
+ assertArrayEquals("Entire content should match", testData, allData);
+ inputStream.close();
+ }
+}
diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml
index 5c0981eae..51b14e9ea 100644
--- a/other/java/examples/pom.xml
+++ b/other/java/examples/pom.xml
@@ -16,14 +16,14 @@
</dependency>
<dependency>
<groupId>com.seaweedfs</groupId>
- <artifactId>seaweedfs-hadoop2-client</artifactId>
+ <artifactId>seaweedfs-hadoop3-client</artifactId>
<version>3.80</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <version>2.10.2</version>
+ <version>3.4.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
diff --git a/other/java/hdfs-over-ftp/pom.xml b/other/java/hdfs-over-ftp/pom.xml
deleted file mode 100644
index 3f7e6c4b0..000000000
--- a/other/java/hdfs-over-ftp/pom.xml
+++ /dev/null
@@ -1,120 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>hdfs-over-ftp</groupId>
- <artifactId>hdfs-over-ftp</artifactId>
- <version>1.0</version>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.4.3</version>
- </parent>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger2</artifactId>
- <version>2.9.2</version>
- </dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger-ui</artifactId>
- <version>2.10.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>3.4.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.ftpserver</groupId>
- <artifactId>ftpserver-core</artifactId>
- <version>1.1.1</version>
- </dependency>
- <dependency>
- <groupId>com.seaweedfs</groupId>
- <artifactId>seaweedfs-hadoop3-client</artifactId>
- <version>1.6.2</version>
- </dependency>
- </dependencies>
-
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- <compilerArguments>
- <verbose />
- <bootclasspath>${java.home}/lib/rt.jar</bootclasspath>
- </compilerArguments>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.6</version>
- <configuration>
- <archive>
- <manifest>
- <mainClass>org.apache.hadoop.seaweed.ftp.ApplicationServer</mainClass>
- <addClasspath>true</addClasspath>
- <classpathPrefix>lib/</classpathPrefix>
- </manifest>
- <manifestEntries>
- <Class-Path>./</Class-Path>
- </manifestEntries>
- </archive>
- </configuration>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/main/resources/assembly.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java
deleted file mode 100644
index b8ef1d840..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/ApplicationServer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.hadoop.seaweed.ftp;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class ApplicationServer {
-
- public static void main(String[] args) {
- SpringApplication.run(ApplicationServer.class, args);
- }
-
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java
deleted file mode 100644
index 3c395493d..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/config/SwaggerConfig.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.config;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import springfox.documentation.builders.ApiInfoBuilder;
-import springfox.documentation.builders.PathSelectors;
-import springfox.documentation.builders.RequestHandlerSelectors;
-import springfox.documentation.spi.DocumentationType;
-import springfox.documentation.spring.web.plugins.Docket;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
-
-@Configuration
-@EnableSwagger2
-public class SwaggerConfig {
- @Bean
- public Docket createRestApi() {
- return new Docket(DocumentationType.SWAGGER_2)
- .pathMapping("/")
- .select()
- .apis(RequestHandlerSelectors.basePackage("org.apache.hadoop.seaweed.ftp"))
- .paths(PathSelectors.any())
- .build().apiInfo(new ApiInfoBuilder()
- .title("FTP API Doc")
- .version("1.0")
- .build());
- }
-} \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java
deleted file mode 100644
index 7a5a4e74d..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/FtpManagerController.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.controller;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import org.apache.hadoop.seaweed.ftp.service.HFtpService;
-import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PutMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@RestController
-@RequestMapping("/manager")
-@Api(tags = "FTP操作管理")
-public class FtpManagerController {
-
- private static Logger log = Logger.getLogger(FtpManagerController.class);
-
- @Autowired
- private HFtpService hdfsOverFtpServer;
-
- @GetMapping("/status")
- @ApiOperation("查看FTP服务状态")
- public Result status() {
- Map map = new HashMap<>();
- try {
- boolean status = hdfsOverFtpServer.statusServer();
- map.put("is_running", status);
- return new Result(true, map, "FTP 服务状态获取成功");
- }catch (Exception e) {
- log.error(e);
- map.put("is_running", false);
- return new Result(true, map, "FTP 服务状态获取成功");
- }
- }
-
- @PutMapping("/start")
- @ApiOperation("启动FTP服务")
- public Result start() {
- try {
- boolean status = hdfsOverFtpServer.statusServer();
- if(!status) {
- hdfsOverFtpServer.startServer();
- }
- return new Result(true, "FTP 服务启动成功");
- }catch (Exception e) {
- log.error(e);
- return new Result(false, "FTP 服务启动失败");
- }
- }
-
- @PutMapping("/stop")
- @ApiOperation("停止FTP服务")
- public Result stop() {
- try {
- boolean status = hdfsOverFtpServer.statusServer();
- if(status) {
- hdfsOverFtpServer.stopServer();
- }
- return new Result(true, "FTP 服务停止成功");
- }catch (Exception e) {
- log.error(e);
- return new Result(false, "FTP 服务停止失败");
- }
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java
deleted file mode 100644
index c4d2261b3..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/UserController.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.controller;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import org.apache.ftpserver.ftplet.User;
-import org.apache.ftpserver.usermanager.Md5PasswordEncryptor;
-import org.apache.ftpserver.usermanager.UserFactory;
-import org.apache.hadoop.seaweed.ftp.controller.vo.FtpUser;
-import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
-import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
-import org.apache.log4j.Logger;
-import org.springframework.web.bind.annotation.*;
-
-import java.io.File;
-
-@RestController
-@RequestMapping("/user")
-@Api(tags = "FTP用户管理")
-public class UserController {
-
- private static Logger log = Logger.getLogger(UserController.class);
-
- /***
- * {
- * "name": "test",
- * "password": "test",
- * "homeDirectory": "/buckets/test/"
- * }
- * @param ftpUser
- * @return
- */
- @PostMapping("/add")
- @ApiOperation("新增/编辑用户")
- public Result add(@RequestBody FtpUser ftpUser) {
- try {
- HdfsUserManager userManagerFactory = new HdfsUserManager();
- userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
- userManagerFactory.setPasswordEncryptor(new Md5PasswordEncryptor());
-
- UserFactory userFactory = new UserFactory();
- userFactory.setHomeDirectory(ftpUser.getHomeDirectory());
- userFactory.setName(ftpUser.getName());
- userFactory.setPassword(ftpUser.getPassword());
- userFactory.setEnabled(ftpUser.isEnabled());
- userFactory.setMaxIdleTime(ftpUser.getMaxIdleTime());
-
- User user = userFactory.createUser();
- userManagerFactory.save(user, ftpUser.isRenamePush());
- return new Result(true, "新建用户成功");
- }catch (Exception e) {
- log.error(e);
- return new Result(false, "新建用户失败");
- }
- }
-
- @DeleteMapping("/delete/{user}")
- @ApiOperation("删除用户")
- public Result delete(@PathVariable(value = "user") String user) {
- try {
- HdfsUserManager userManagerFactory = new HdfsUserManager();
- userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
- userManagerFactory.delete(user);
- return new Result(true, "删除用户成功");
- }catch (Exception e) {
- log.error(e);
- return new Result(false, "删除用户失败");
- }
- }
-
- @GetMapping("/show/{userName}")
- @ApiOperation("查看用户")
- public Result show(@PathVariable(value = "userName") String userName) {
- try {
- HdfsUserManager userManagerFactory = new HdfsUserManager();
- userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
- User user = userManagerFactory.getUserByName(userName);
- FtpUser ftpUser = new FtpUser(user.getHomeDirectory(), user.getPassword(), user.getEnabled(), user.getName(), user.getMaxIdleTime(), HdfsUserManager.getUserRenamePush(userName));
- return new Result(true, ftpUser, "获取用户信息成功");
- }catch (Exception e) {
- log.error(e);
- return new Result(false, "获取用户信息失败");
- }
- }
-
- @GetMapping("/list")
- @ApiOperation("列举用户")
- public Result list() {
- try {
- HdfsUserManager userManagerFactory = new HdfsUserManager();
- userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
- String[] allUserNames = userManagerFactory.getAllUserNames();
- return new Result(true, allUserNames, "列举用户成功");
- }catch (Exception e) {
- log.error(e);
- return new Result(false, "列举用户失败");
- }
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java
deleted file mode 100644
index 953d08603..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/FtpUser.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.controller.vo;
-
-public class FtpUser {
-
- private String homeDirectory;
- private String password;
- private boolean enabled;
- private String name;
- private int maxIdleTime;
- private boolean renamePush;
-
- public FtpUser() {
- }
-
- public FtpUser(String homeDirectory, String password, boolean enabled, String name, int maxIdleTime, boolean renamePush) {
- this.homeDirectory = homeDirectory;
- this.password = password;
- this.enabled = enabled;
- this.name = name;
- this.maxIdleTime = maxIdleTime;
- this.renamePush = renamePush;
- }
-
- public String getHomeDirectory() {
- return homeDirectory;
- }
-
- public void setHomeDirectory(String homeDirectory) {
- this.homeDirectory = homeDirectory;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public boolean isEnabled() {
- return enabled;
- }
-
- public void setEnabled(boolean enabled) {
- this.enabled = enabled;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getMaxIdleTime() {
- return maxIdleTime;
- }
-
- public void setMaxIdleTime(int maxIdleTime) {
- this.maxIdleTime = maxIdleTime;
- }
-
- public boolean isRenamePush() {
- return renamePush;
- }
-
- public void setRenamePush(boolean renamePush) {
- this.renamePush = renamePush;
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java
deleted file mode 100644
index b6a480ba7..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/controller/vo/Result.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.controller.vo;
-
-public class Result {
-
- private boolean status;
- private Object data;
- private String message;
-
- public Result(boolean status, String message) {
- this.status = status;
- this.message = message;
- }
-
- public Result(boolean status, Object data, String message) {
- this.status = status;
- this.message = message;
- this.data = data;
- }
-
- public boolean isStatus() {
- return status;
- }
-
- public void setStatus(boolean status) {
- this.status = status;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- public Object getData() {
- return data;
- }
-
- public void setData(Object data) {
- this.data = data;
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java
deleted file mode 100644
index 9fe5dfd95..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/HFtpService.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.service;
-
-import org.apache.ftpserver.DataConnectionConfiguration;
-import org.apache.ftpserver.DataConnectionConfigurationFactory;
-import org.apache.ftpserver.FtpServer;
-import org.apache.ftpserver.FtpServerFactory;
-import org.apache.ftpserver.command.CommandFactoryFactory;
-import org.apache.ftpserver.listener.ListenerFactory;
-import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsFileSystemManager;
-import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsOverFtpSystem;
-import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import java.io.File;
-
-/**
- * reference: https://github.com/AShiou/hof
- */
-@Component
-public class HFtpService {
-
- private static Logger log = Logger.getLogger(HFtpService.class);
-
- @Value("${ftp.port}")
- private int port = 0;
-
- @Value("${ftp.passive-address}")
- private String passiveAddress;
-
- @Value("${ftp.passive-ports}")
- private String passivePorts;
-
- @Value("${hdfs.uri}")
- private String hdfsUri;
-
- @Value("${seaweedFs.enable}")
- private boolean seaweedFsEnable;
-
- @Value("${seaweedFs.access}")
- private String seaweedFsAccess;
-
- @Value("${seaweedFs.replication}")
- private String seaweedFsReplication;
-
- private FtpServer ftpServer = null;
-
- public void startServer() throws Exception {
- log.info("Starting HDFS-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
-
- HdfsOverFtpSystem.setHdfsUri(hdfsUri);
- HdfsOverFtpSystem.setSeaweedFsEnable(seaweedFsEnable);
- HdfsOverFtpSystem.setSeaweedFsAccess(seaweedFsAccess);
- HdfsOverFtpSystem.setSeaweedFsReplication(seaweedFsReplication);
-
- FtpServerFactory server = new FtpServerFactory();
- server.setFileSystem(new HdfsFileSystemManager());
-
- ListenerFactory factory = new ListenerFactory();
- factory.setPort(port);
-
- DataConnectionConfigurationFactory dccFactory = new DataConnectionConfigurationFactory();
- dccFactory.setPassiveAddress("0.0.0.0");
- dccFactory.setPassivePorts(passivePorts);
- dccFactory.setPassiveExternalAddress(passiveAddress);
- DataConnectionConfiguration dcc = dccFactory.createDataConnectionConfiguration();
- factory.setDataConnectionConfiguration(dcc);
-
- server.addListener("default", factory.createListener());
-
- HdfsUserManager userManager = new HdfsUserManager();
- final File file = loadResource("/users.properties");
- userManager.setFile(file);
- server.setUserManager(userManager);
-
- CommandFactoryFactory cmFact = new CommandFactoryFactory();
- cmFact.setUseDefaultCommands(true);
- server.setCommandFactory(cmFact.createCommandFactory());
-
- // start the server
- ftpServer = server.createServer();
- ftpServer.start();
- }
-
- public void stopServer() {
- log.info("Stopping Hdfs-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
- ftpServer.stop();
- }
-
- public boolean statusServer() {
- try {
- return !ftpServer.isStopped();
- }catch (Exception e) {
- return false;
- }
- }
-
- private static File loadResource(String resourceName) {
- return new File(System.getProperty("user.dir") + resourceName);
- }
-} \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java
deleted file mode 100644
index e97c2dc14..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileObject.java
+++ /dev/null
@@ -1,333 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.service.filesystem;
-
-import org.apache.ftpserver.ftplet.FtpFile;
-import org.apache.ftpserver.ftplet.User;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.seaweed.ftp.users.HdfsUser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This class implements all actions to HDFS
- */
-public class HdfsFileObject implements FtpFile {
-
- private final Logger log = LoggerFactory.getLogger(HdfsFileObject.class);
-
- private Path homePath;
- private Path path;
- private Path fullPath;
- private HdfsUser user;
-
- /**
- * Constructs HdfsFileObject from path
- *
- * @param path path to represent object
- * @param user accessor of the object
- */
- public HdfsFileObject(String homePath, String path, User user) {
- this.homePath = new Path(homePath);
- this.path = new Path(path);
- this.fullPath = new Path(homePath + path);
- this.user = (HdfsUser) user;
- }
-
- public String getAbsolutePath() {
- // strip the last '/' if necessary
- String fullName = path.toString();
- int filelen = fullName.length();
- if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
- fullName = fullName.substring(0, filelen - 1);
- }
-
- return fullName;
- }
-
- public String getName() {
- return path.getName();
- }
-
- /**
- * HDFS has no hidden objects
- *
- * @return always false
- */
- public boolean isHidden() {
- return false;
- }
-
- /**
- * Checks if the object is a directory
- *
- * @return true if the object is a directory
- */
- public boolean isDirectory() {
- try {
- log.debug("is directory? : " + fullPath);
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- FileStatus fs = dfs.getFileStatus(fullPath);
- return fs.isDir();
- } catch (IOException e) {
- log.debug(fullPath + " is not dir", e);
- return false;
- }
- }
-
- /**
- * Checks if the object is a file
- *
- * @return true if the object is a file
- */
- public boolean isFile() {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- return dfs.isFile(fullPath);
- } catch (IOException e) {
- log.debug(fullPath + " is not file", e);
- return false;
- }
- }
-
- /**
- * Checks if the object does exist
- *
- * @return true if the object does exist
- */
- public boolean doesExist() {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- dfs.getFileStatus(fullPath);
- return true;
- } catch (IOException e) {
- // log.debug(path + " does not exist", e);
- return false;
- }
- }
-
- public boolean isReadable() {
- return true;
- }
-
- public boolean isWritable() {
- return true;
- }
-
- public boolean isRemovable() {
- return true;
- }
-
- /**
- * Get owner of the object
- *
- * @return owner of the object
- */
- public String getOwnerName() {
- return "root";
- /*
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- FileStatus fs = dfs.getFileStatus(fullPath);
- String owner = fs.getOwner();
- if(owner.length() == 0) {
- return "root";
- }
- return owner;
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- */
- }
-
- /**
- * Get group of the object
- *
- * @return group of the object
- */
- public String getGroupName() {
- return "root";
- /*
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- FileStatus fs = dfs.getFileStatus(fullPath);
- String group = fs.getGroup();
- if(group.length() == 0) {
- return "root";
- }
- return group;
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- */
- }
-
- /**
- * Get link count
- *
- * @return 3 is for a directory and 1 is for a file
- */
- public int getLinkCount() {
- return isDirectory() ? 3 : 1;
- }
-
- /**
- * Get last modification date
- *
- * @return last modification date as a long
- */
- public long getLastModified() {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- FileStatus fs = dfs.getFileStatus(fullPath);
- return fs.getModificationTime();
- } catch (IOException e) {
- e.printStackTrace();
- return 0;
- }
- }
-
- public boolean setLastModified(long l) {
- return false;
- }
-
- /**
- * Get a size of the object
- *
- * @return size of the object in bytes
- */
- public long getSize() {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- FileStatus fs = dfs.getFileStatus(fullPath);
- log.debug("getSize(): " + fullPath + " : " + fs.getLen());
- return fs.getLen();
- } catch (IOException e) {
- e.printStackTrace();
- return 0;
- }
- }
-
- public Object getPhysicalFile() {
- return null;
- }
-
- /**
- * Create a new dir from the object
- *
- * @return true if dir is created
- */
- public boolean mkdir() {
- try {
- FileSystem fs = HdfsOverFtpSystem.getDfs();
- fs.mkdirs(fullPath);
-// fs.setOwner(path, user.getName(), user.getMainGroup());
- return true;
- } catch (IOException e) {
- e.printStackTrace();
- return false;
- }
- }
-
- /**
- * Delete object from the HDFS filesystem
- *
- * @return true if the object is deleted
- */
- public boolean delete() {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- dfs.delete(fullPath, true);
- return true;
- } catch (IOException e) {
- e.printStackTrace();
- return false;
- }
- }
-
- public boolean move(FtpFile ftpFile) {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- dfs.rename(fullPath, new Path(fullPath.getParent() + File.separator + ftpFile.getName()));
- return true;
- } catch (IOException e) {
- e.printStackTrace();
- return false;
- }
- }
-
-
- /**
- * List files of the directory
- *
- * @return List of files in the directory
- */
- public List<FtpFile> listFiles() {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- FileStatus fileStats[] = dfs.listStatus(fullPath);
-
- // get the virtual name of the base directory
- String virtualFileStr = getAbsolutePath();
- if (virtualFileStr.charAt(virtualFileStr.length() - 1) != '/') {
- virtualFileStr += '/';
- }
-
- FtpFile[] virtualFiles = new FtpFile[fileStats.length];
- for (int i = 0; i < fileStats.length; i++) {
- File fileObj = new File(fileStats[i].getPath().toString());
- String fileName = virtualFileStr + fileObj.getName();
- virtualFiles[i] = new HdfsFileObject(homePath.toString(), fileName, user);
- }
- return Collections.unmodifiableList(Arrays.asList(virtualFiles));
- } catch (IOException e) {
- log.debug("", e);
- return null;
- }
- }
-
- /**
- * Creates output stream to write to the object
- *
- * @param l is not used here
- * @return OutputStream
- * @throws IOException
- */
- public OutputStream createOutputStream(long l) {
- try {
- FileSystem fs = HdfsOverFtpSystem.getDfs();
- FSDataOutputStream out = fs.create(fullPath);
-// fs.setOwner(fullPath, user.getName(), user.getMainGroup());
- return out;
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- }
-
- /**
- * Creates input stream to read from the object
- *
- * @param l is not used here
- * @return OutputStream
- * @throws IOException
- */
- public InputStream createInputStream(long l) {
- try {
- FileSystem dfs = HdfsOverFtpSystem.getDfs();
- FSDataInputStream in = dfs.open(fullPath);
- return in;
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java
deleted file mode 100644
index 533c2c3aa..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemManager.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.service.filesystem;
-
-import org.apache.ftpserver.ftplet.FileSystemFactory;
-import org.apache.ftpserver.ftplet.FileSystemView;
-import org.apache.ftpserver.ftplet.User;
-
-/**
- * Impelented FileSystemManager to use HdfsFileSystemView
- */
-public class HdfsFileSystemManager implements FileSystemFactory {
- public FileSystemView createFileSystemView(User user) {
- return new HdfsFileSystemView(user);
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java
deleted file mode 100644
index 8b910e775..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsFileSystemView.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.service.filesystem;
-
-import org.apache.ftpserver.ftplet.FileSystemView;
-import org.apache.ftpserver.ftplet.FtpFile;
-import org.apache.ftpserver.ftplet.User;
-import org.apache.hadoop.fs.Path;
-
-import java.io.File;
-
-/**
- * Implemented FileSystemView to use HdfsFileObject
- */
-public class HdfsFileSystemView implements FileSystemView {
-
- private String homePath;
- private String currPath = File.separator;
- private User user;
-
- /**
- * Constructor - set the user object.
- */
- protected HdfsFileSystemView(User user) {
- if (user == null) {
- throw new IllegalArgumentException("user can not be null");
- }
- if (user.getHomeDirectory() == null) {
- throw new IllegalArgumentException(
- "User home directory can not be null");
- }
-
- this.homePath = user.getHomeDirectory();
- this.user = user;
- }
-
- public FtpFile getHomeDirectory() {
- return new HdfsFileObject(homePath, File.separator, user);
- }
-
- public FtpFile getWorkingDirectory() {
- FtpFile fileObj;
- if (currPath.equals(File.separator)) {
- fileObj = new HdfsFileObject(homePath, File.separator, user);
- } else {
- fileObj = new HdfsFileObject(homePath, currPath, user);
-
- }
- return fileObj;
- }
-
- public boolean changeWorkingDirectory(String dir) {
-
- Path path;
- if (dir.startsWith(File.separator) || new Path(currPath).equals(new Path(dir))) {
- path = new Path(dir);
- } else if (currPath.length() > 1) {
- path = new Path(currPath + File.separator + dir);
- } else {
- if(dir.startsWith("/")) {
- path = new Path(dir);
- }else {
- path = new Path(File.separator + dir);
- }
- }
-
- // 防止退回根目录
- if (path.getName().equals("..")) {
- path = new Path(File.separator);
- }
-
- HdfsFileObject file = new HdfsFileObject(homePath, path.toString(), user);
- if (file.isDirectory()) {
- currPath = path.toString();
- return true;
- } else {
- return false;
- }
- }
-
- public FtpFile getFile(String file) {
- String path;
- if (file.startsWith(File.separator)) {
- path = file;
- } else if (currPath.length() > 1) {
- path = currPath + File.separator + file;
- } else {
- path = File.separator + file;
- }
- return new HdfsFileObject(homePath, path, user);
- }
-
- /**
- * Is the file content random accessible?
- */
- public boolean isRandomAccessible() {
- return true;
- }
-
- /**
- * Dispose file system view - does nothing.
- */
- public void dispose() {
- }
-
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java
deleted file mode 100644
index 149fd6857..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/service/filesystem/HdfsOverFtpSystem.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.service.filesystem;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Class to store DFS connection
- */
-public class HdfsOverFtpSystem {
-
- private static FileSystem fs = null;
-
- private static String hdfsUri;
-
- private static boolean seaweedFsEnable;
-
- private static String seaweedFsAccess;
-
- private static String seaweedFsReplication;
-
- private final static Logger log = LoggerFactory.getLogger(HdfsOverFtpSystem.class);
-
- private static void hdfsInit() throws IOException {
- Configuration configuration = new Configuration();
-
- configuration.set("fs.defaultFS", hdfsUri);
- if(seaweedFsEnable) {
- configuration.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
- configuration.set("fs.seaweed.volume.server.access", seaweedFsAccess);
- configuration.set("fs.seaweed.replication", seaweedFsReplication);
- }
- fs = FileSystem.get(configuration);
- log.info("HDFS load success");
- }
-
- /**
- * Get dfs
- *
- * @return dfs
- * @throws IOException
- */
- public static FileSystem getDfs() throws IOException {
- if (fs == null) {
- hdfsInit();
- }
- return fs;
- }
-
- public static void setHdfsUri(String hdfsUri) {
- HdfsOverFtpSystem.hdfsUri = hdfsUri;
- }
-
- public static String getHdfsUri() {
- return hdfsUri;
- }
-
- public static void setSeaweedFsEnable(boolean seaweedFsEnable) {
- HdfsOverFtpSystem.seaweedFsEnable = seaweedFsEnable;
- }
-
- public static void setSeaweedFsAccess(String seaweedFsAccess) {
- HdfsOverFtpSystem.seaweedFsAccess = seaweedFsAccess;
- }
-
- public static void setSeaweedFsReplication(String seaweedFsReplication) {
- HdfsOverFtpSystem.seaweedFsReplication = seaweedFsReplication;
- }
-} \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java
deleted file mode 100644
index c82f6516f..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUser.java
+++ /dev/null
@@ -1,239 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.users;
-
-import org.apache.ftpserver.ftplet.Authority;
-import org.apache.ftpserver.ftplet.AuthorizationRequest;
-import org.apache.ftpserver.ftplet.User;
-import org.apache.log4j.Logger;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class HdfsUser implements User, Serializable {
-
- private static final long serialVersionUID = -47371353779731294L;
-
- private String name = null;
-
- private String password = null;
-
- private int maxIdleTimeSec = 0; // no limit
-
- private String homeDir = null;
-
- private boolean isEnabled = true;
-
- private List<? extends Authority> authorities = new ArrayList<Authority>();
-
- private ArrayList<String> groups = new ArrayList<String>();
-
- private Logger log = Logger.getLogger(HdfsUser.class);
-
- /**
- * Default constructor.
- */
- public HdfsUser() {
- }
-
- /**
- * Copy constructor.
- */
- public HdfsUser(User user) {
- name = user.getName();
- password = user.getPassword();
- authorities = user.getAuthorities();
- maxIdleTimeSec = user.getMaxIdleTime();
- homeDir = user.getHomeDirectory();
- isEnabled = user.getEnabled();
- }
-
- public ArrayList<String> getGroups() {
- return groups;
- }
-
- /**
- * Get the main group of the user
- *
- * @return main group of the user
- */
- public String getMainGroup() {
- if (groups.size() > 0) {
- return groups.get(0);
- } else {
- log.error("User " + name + " is not a memer of any group");
- return "error";
- }
- }
-
- /**
- * Checks if user is a member of the group
- *
- * @param group to check
- * @return true if the user id a member of the group
- */
- public boolean isGroupMember(String group) {
- for (String userGroup : groups) {
- if (userGroup.equals(group)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Set users' groups
- *
- * @param groups to set
- */
- public void setGroups(ArrayList<String> groups) {
- if (groups.size() < 1) {
- log.error("User " + name + " is not a memer of any group");
- }
- this.groups = groups;
- }
-
- /**
- * Get the user name.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Set user name.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Get the user password.
- */
- public String getPassword() {
- return password;
- }
-
- /**
- * Set user password.
- */
- public void setPassword(String pass) {
- password = pass;
- }
-
- public List<Authority> getAuthorities() {
- if (authorities != null) {
- return Collections.unmodifiableList(authorities);
- } else {
- return null;
- }
- }
-
- public void setAuthorities(List<Authority> authorities) {
- if (authorities != null) {
- this.authorities = Collections.unmodifiableList(authorities);
- } else {
- this.authorities = null;
- }
- }
-
- /**
- * Get the maximum idle time in second.
- */
- public int getMaxIdleTime() {
- return maxIdleTimeSec;
- }
-
- /**
- * Set the maximum idle time in second.
- */
- public void setMaxIdleTime(int idleSec) {
- maxIdleTimeSec = idleSec;
- if (maxIdleTimeSec < 0) {
- maxIdleTimeSec = 0;
- }
- }
-
- /**
- * Get the user enable status.
- */
- public boolean getEnabled() {
- return isEnabled;
- }
-
- /**
- * Set the user enable status.
- */
- public void setEnabled(boolean enb) {
- isEnabled = enb;
- }
-
- /**
- * Get the user home directory.
- */
- public String getHomeDirectory() {
- return homeDir;
- }
-
- /**
- * Set the user home directory.
- */
- public void setHomeDirectory(String home) {
- homeDir = home;
- }
-
- /**
- * String representation.
- */
- public String toString() {
- return name;
- }
-
- /**
- * {@inheritDoc}
- */
- public AuthorizationRequest authorize(AuthorizationRequest request) {
- List<Authority> authorities = getAuthorities();
-
- // check for no authorities at all
- if (authorities == null) {
- return null;
- }
-
- boolean someoneCouldAuthorize = false;
- for (Authority authority : authorities) {
- if (authority.canAuthorize(request)) {
- someoneCouldAuthorize = true;
-
- request = authority.authorize(request);
-
- // authorization failed, return null
- if (request == null) {
- return null;
- }
- }
-
- }
-
- if (someoneCouldAuthorize) {
- return request;
- } else {
- return null;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public List<Authority> getAuthorities(Class<? extends Authority> clazz) {
- List<Authority> selected = new ArrayList<Authority>();
-
- for (Authority authority : authorities) {
- if (authority.getClass().equals(clazz)) {
- selected.add(authority);
- }
- }
-
- return selected;
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java b/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java
deleted file mode 100644
index 7eb296160..000000000
--- a/other/java/hdfs-over-ftp/src/main/java/org/apache/hadoop/seaweed/ftp/users/HdfsUserManager.java
+++ /dev/null
@@ -1,453 +0,0 @@
-package org.apache.hadoop.seaweed.ftp.users;
-
-import org.apache.ftpserver.FtpServerConfigurationException;
-import org.apache.ftpserver.ftplet.*;
-import org.apache.ftpserver.usermanager.*;
-import org.apache.ftpserver.usermanager.impl.*;
-import org.apache.ftpserver.util.BaseProperties;
-import org.apache.ftpserver.util.IoUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-public class HdfsUserManager extends AbstractUserManager {
-
- private final Logger LOG = LoggerFactory
- .getLogger(HdfsUserManager.class);
-
- private final static String DEPRECATED_PREFIX = "FtpServer.user.";
-
- private final static String PREFIX = "ftpserver.user.";
-
- private static BaseProperties userDataProp;
-
- private File userDataFile = new File("users.conf");
-
- private boolean isConfigured = false;
-
- private PasswordEncryptor passwordEncryptor = new Md5PasswordEncryptor();
-
-
- /**
- * Retrieve the file used to load and store users
- *
- * @return The file
- */
- public File getFile() {
- return userDataFile;
- }
-
- /**
- * Set the file used to store and read users. Must be set before
- * {@link #configure()} is called.
- *
- * @param propFile A file containing users
- */
- public void setFile(File propFile) {
- if (isConfigured) {
- throw new IllegalStateException("Must be called before configure()");
- }
-
- this.userDataFile = propFile;
- }
-
-
- /**
- * Retrieve the password encryptor used for this user manager
- *
- * @return The password encryptor. Default to {@link Md5PasswordEncryptor}
- * if no other has been provided
- */
- public PasswordEncryptor getPasswordEncryptor() {
- return passwordEncryptor;
- }
-
-
- /**
- * Set the password encryptor to use for this user manager
- *
- * @param passwordEncryptor The password encryptor
- */
- public void setPasswordEncryptor(PasswordEncryptor passwordEncryptor) {
- this.passwordEncryptor = passwordEncryptor;
- }
-
-
- /**
- * Lazy init the user manager
- */
- private void lazyInit() {
- if (!isConfigured) {
- configure();
- }
- }
-
- /**
- * Configure user manager.
- */
- public void configure() {
- isConfigured = true;
- try {
- userDataProp = new BaseProperties();
-
- if (userDataFile != null && userDataFile.exists()) {
- FileInputStream fis = null;
- try {
- fis = new FileInputStream(userDataFile);
- userDataProp.load(fis);
- } finally {
- IoUtils.close(fis);
- }
- }
- } catch (IOException e) {
- throw new FtpServerConfigurationException(
- "Error loading user data file : "
- + userDataFile.getAbsolutePath(), e);
- }
-
- convertDeprecatedPropertyNames();
- }
-
- private void convertDeprecatedPropertyNames() {
- Enumeration<?> keys = userDataProp.propertyNames();
-
- boolean doSave = false;
-
- while (keys.hasMoreElements()) {
- String key = (String) keys.nextElement();
-
- if (key.startsWith(DEPRECATED_PREFIX)) {
- String newKey = PREFIX
- + key.substring(DEPRECATED_PREFIX.length());
- userDataProp.setProperty(newKey, userDataProp.getProperty(key));
- userDataProp.remove(key);
-
- doSave = true;
- }
- }
-
- if (doSave) {
- try {
- saveUserData();
- } catch (FtpException e) {
- throw new FtpServerConfigurationException(
- "Failed to save updated user data", e);
- }
- }
- }
-
- public synchronized void save(User usr, boolean renamePush) throws FtpException {
- lazyInit();
- userDataProp.setProperty(PREFIX + usr.getName() + ".rename.push", renamePush);
- save(usr);
- }
-
- /**
- * Save user data. Store the properties.
- */
- public synchronized void save(User usr) throws FtpException {
- lazyInit();
-
- // null value check
- if (usr.getName() == null) {
- throw new NullPointerException("User name is null.");
- }
- String thisPrefix = PREFIX + usr.getName() + '.';
-
- // set other properties
- userDataProp.setProperty(thisPrefix + ATTR_PASSWORD, getPassword(usr));
-
- String home = usr.getHomeDirectory();
- if (home == null) {
- home = "/";
- }
- userDataProp.setProperty(thisPrefix + ATTR_HOME, home);
- userDataProp.setProperty(thisPrefix + ATTR_ENABLE, usr.getEnabled());
- userDataProp.setProperty(thisPrefix + ATTR_WRITE_PERM, usr
- .authorize(new WriteRequest()) != null);
- userDataProp.setProperty(thisPrefix + ATTR_MAX_IDLE_TIME, usr
- .getMaxIdleTime());
-
- TransferRateRequest transferRateRequest = new TransferRateRequest();
- transferRateRequest = (TransferRateRequest) usr
- .authorize(transferRateRequest);
-
- if (transferRateRequest != null) {
- userDataProp.setProperty(thisPrefix + ATTR_MAX_UPLOAD_RATE,
- transferRateRequest.getMaxUploadRate());
- userDataProp.setProperty(thisPrefix + ATTR_MAX_DOWNLOAD_RATE,
- transferRateRequest.getMaxDownloadRate());
- } else {
- userDataProp.remove(thisPrefix + ATTR_MAX_UPLOAD_RATE);
- userDataProp.remove(thisPrefix + ATTR_MAX_DOWNLOAD_RATE);
- }
-
- // request that always will succeed
- ConcurrentLoginRequest concurrentLoginRequest = new ConcurrentLoginRequest(
- 0, 0);
- concurrentLoginRequest = (ConcurrentLoginRequest) usr
- .authorize(concurrentLoginRequest);
-
- if (concurrentLoginRequest != null) {
- userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_NUMBER,
- concurrentLoginRequest.getMaxConcurrentLogins());
- userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_PER_IP,
- concurrentLoginRequest.getMaxConcurrentLoginsPerIP());
- } else {
- userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_NUMBER);
- userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_PER_IP);
- }
-
- saveUserData();
- }
-
- /**
- * @throws FtpException
- */
- private void saveUserData() throws FtpException {
- File dir = userDataFile.getAbsoluteFile().getParentFile();
- if (dir != null && !dir.exists() && !dir.mkdirs()) {
- String dirName = dir.getAbsolutePath();
- throw new FtpServerConfigurationException(
- "Cannot create directory for user data file : " + dirName);
- }
-
- // save user data
- FileOutputStream fos = null;
- try {
- fos = new FileOutputStream(userDataFile);
- userDataProp.store(fos, "Generated file - don't edit (please)");
- } catch (IOException ex) {
- LOG.error("Failed saving user data", ex);
- throw new FtpException("Failed saving user data", ex);
- } finally {
- IoUtils.close(fos);
- }
- }
-
-
- public synchronized void list() throws FtpException {
- lazyInit();
-
- Map dataMap = new HashMap();
- Enumeration<String> propNames = (Enumeration<String>) userDataProp.propertyNames();
- ArrayList<String> a = Collections.list(propNames);
- a.remove("i18nMap");//去除i18nMap
- for(String attrName : a){
-// dataMap.put(attrName, propNames.);
- }
-
- }
-
- /**
- * Delete an user. Removes all this user entries from the properties. After
- * removing the corresponding from the properties, save the data.
- */
- public synchronized void delete(String usrName) throws FtpException {
- lazyInit();
-
- // remove entries from properties
- String thisPrefix = PREFIX + usrName + '.';
- Enumeration<?> propNames = userDataProp.propertyNames();
- ArrayList<String> remKeys = new ArrayList<String>();
- while (propNames.hasMoreElements()) {
- String thisKey = propNames.nextElement().toString();
- if (thisKey.startsWith(thisPrefix)) {
- remKeys.add(thisKey);
- }
- }
- Iterator<String> remKeysIt = remKeys.iterator();
- while (remKeysIt.hasNext()) {
- userDataProp.remove(remKeysIt.next());
- }
-
- saveUserData();
- }
-
- /**
- * Get user password. Returns the encrypted value.
- * <p/>
- * <pre>
- * If the password value is not null
- * password = new password
- * else
- * if user does exist
- * password = old password
- * else
- * password = &quot;&quot;
- * </pre>
- */
- private String getPassword(User usr) {
- String name = usr.getName();
- String password = usr.getPassword();
-
- if (password != null) {
- password = passwordEncryptor.encrypt(password);
- } else {
- String blankPassword = passwordEncryptor.encrypt("");
-
- if (doesExist(name)) {
- String key = PREFIX + name + '.' + ATTR_PASSWORD;
- password = userDataProp.getProperty(key, blankPassword);
- } else {
- password = blankPassword;
- }
- }
- return password;
- }
-
- /**
- * Get all user names.
- */
- public synchronized String[] getAllUserNames() {
- lazyInit();
-
- // get all user names
- String suffix = '.' + ATTR_HOME;
- ArrayList<String> ulst = new ArrayList<String>();
- Enumeration<?> allKeys = userDataProp.propertyNames();
- int prefixlen = PREFIX.length();
- int suffixlen = suffix.length();
- while (allKeys.hasMoreElements()) {
- String key = (String) allKeys.nextElement();
- if (key.endsWith(suffix)) {
- String name = key.substring(prefixlen);
- int endIndex = name.length() - suffixlen;
- name = name.substring(0, endIndex);
- ulst.add(name);
- }
- }
-
- Collections.sort(ulst);
- return ulst.toArray(new String[0]);
- }
-
- private ArrayList<String> parseGroups(String groupsLine) {
- String groupsArray[] = groupsLine.split(",");
- return new ArrayList(Arrays.asList(groupsArray));
- }
-
- public static synchronized boolean getUserRenamePush(String userName) {
- return userDataProp.getBoolean(PREFIX + userName + ".rename.push", false);
- }
-
- /**
- * Load user data.
- */
- public synchronized User getUserByName(String userName) {
- lazyInit();
-
- if (!doesExist(userName)) {
- return null;
- }
-
- String baseKey = PREFIX + userName + '.';
- HdfsUser user = new HdfsUser();
- user.setName(userName);
- user.setEnabled(userDataProp.getBoolean(baseKey + ATTR_ENABLE, true));
- user.setHomeDirectory(userDataProp
- .getProperty(baseKey + ATTR_HOME, "/"));
-
-// user.setGroups(parseGroups(userDataProp
-// .getProperty(baseKey + "groups")));
-
- List<Authority> authorities = new ArrayList<Authority>();
-
- if (userDataProp.getBoolean(baseKey + ATTR_WRITE_PERM, false)) {
- authorities.add(new WritePermission());
- }
-
- int maxLogin = userDataProp.getInteger(baseKey + ATTR_MAX_LOGIN_NUMBER,
- 0);
- int maxLoginPerIP = userDataProp.getInteger(baseKey
- + ATTR_MAX_LOGIN_PER_IP, 0);
-
- authorities.add(new ConcurrentLoginPermission(maxLogin, maxLoginPerIP));
-
- int uploadRate = userDataProp.getInteger(
- baseKey + ATTR_MAX_UPLOAD_RATE, 0);
- int downloadRate = userDataProp.getInteger(baseKey
- + ATTR_MAX_DOWNLOAD_RATE, 0);
-
- authorities.add(new TransferRatePermission(downloadRate, uploadRate));
-
- user.setAuthorities(authorities);
-
- user.setMaxIdleTime(userDataProp.getInteger(baseKey
- + ATTR_MAX_IDLE_TIME, 0));
-
- return user;
- }
-
- /**
- * User existance check
- */
- public synchronized boolean doesExist(String name) {
- lazyInit();
-
- String key = PREFIX + name + '.' + ATTR_HOME;
- return userDataProp.containsKey(key);
- }
-
- /**
- * User authenticate method
- */
- public synchronized User authenticate(Authentication authentication)
- throws AuthenticationFailedException {
- lazyInit();
-
- if (authentication instanceof UsernamePasswordAuthentication) {
- UsernamePasswordAuthentication upauth = (UsernamePasswordAuthentication) authentication;
-
- String user = upauth.getUsername();
- String password = upauth.getPassword();
-
- if (user == null) {
- throw new AuthenticationFailedException("Authentication failed");
- }
-
- if (password == null) {
- password = "";
- }
-
- String storedPassword = userDataProp.getProperty(PREFIX + user + '.'
- + ATTR_PASSWORD);
-
- if (storedPassword == null) {
- // user does not exist
- throw new AuthenticationFailedException("Authentication failed");
- }
-
- if (passwordEncryptor.matches(password, storedPassword)) {
- return getUserByName(user);
- } else {
- throw new AuthenticationFailedException("Authentication failed");
- }
-
- } else if (authentication instanceof AnonymousAuthentication) {
- if (doesExist("anonymous")) {
- return getUserByName("anonymous");
- } else {
- throw new AuthenticationFailedException("Authentication failed");
- }
- } else {
- throw new IllegalArgumentException(
- "Authentication not supported by this user manager");
- }
- }
-
- /**
- * Close the user manager - remove existing entries.
- */
- public synchronized void dispose() {
- if (userDataProp != null) {
- userDataProp.clear();
- userDataProp = null;
- }
- }
-}
diff --git a/other/java/hdfs-over-ftp/src/main/resources/application.yml b/other/java/hdfs-over-ftp/src/main/resources/application.yml
deleted file mode 100644
index 128bab1f9..000000000
--- a/other/java/hdfs-over-ftp/src/main/resources/application.yml
+++ /dev/null
@@ -1,15 +0,0 @@
-server:
- port: 8080
-
-ftp:
- port: 2222
- passive-address: localhost
- passive-ports: 30000-30999
-
-hdfs:
- uri: seaweedfs://localhost:8888
-
-seaweedFs:
- enable: true
- access: direct # direct/filerProxy/publicUrl
- replication: "000" \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/src/main/resources/assembly.xml b/other/java/hdfs-over-ftp/src/main/resources/assembly.xml
deleted file mode 100644
index 84fef56f8..000000000
--- a/other/java/hdfs-over-ftp/src/main/resources/assembly.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
-
- <id>package</id>
- <formats>
- <!-- 指定打包格式,支持的打包格式有zip、tar、tar.gz (or tgz)、tar.bz2 (or tbz2)、jar、dir、war,可以同时指定多个打包格式 -->
- <format>tar.gz</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
-
- <fileSets>
- <fileSet>
- <directory>src/main/resources</directory>
- <outputDirectory>/</outputDirectory>
- <includes>
- <include>application.yml</include>
- <include>logback-spring.xml</include>
- <include>users.properties</include>
- <include>kafka-producer.properties</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${project.build.directory}</directory>
- <outputDirectory>/</outputDirectory>
- <includes>
- <include>*.jar</include>
- </includes>
- </fileSet>
- </fileSets>
- <dependencySets>
- <dependencySet>
- <useProjectArtifact>false</useProjectArtifact>
- <outputDirectory>lib</outputDirectory>
- <scope>runtime</scope>
- <unpack>false</unpack>
- </dependencySet>
- </dependencySets>
-</assembly>
diff --git a/other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml b/other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml
deleted file mode 100644
index 96b4c1d71..000000000
--- a/other/java/hdfs-over-ftp/src/main/resources/logback-spring.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<configuration>
- <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
- <property name="LOG_HOME" value="${user.dir}/logs/" />
-
- <!-- 控制台输出 -->
- <appender name="Stdout" class="ch.qos.logback.core.ConsoleAppender">
- <!-- 日志输出编码 -->
- <layout class="ch.qos.logback.classic.PatternLayout">
- <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
- <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
- </pattern>
- </layout>
- </appender>
-
- <!-- 按照每天生成日志文件 -->
- <appender name="RollingFile"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <File>${LOG_HOME}/fileLog.log</File>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_HOME}/fileLog.log.%d.%i</fileNamePattern>
- <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>100 MB</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- </rollingPolicy>
- <encoder>
- <pattern>
- %d %p (%file:%line\)- %m%n
- </pattern>
- <charset>UTF-8</charset>
- </encoder>
- </appender>
-
- <!-- 日志输出级别 -->
- <root level="info">
- <appender-ref ref="Stdout" />
- <appender-ref ref="RollingFile" />
- </root>
-
-</configuration> \ No newline at end of file
diff --git a/other/java/hdfs-over-ftp/users.properties b/other/java/hdfs-over-ftp/users.properties
deleted file mode 100644
index aeeab8e35..000000000
--- a/other/java/hdfs-over-ftp/users.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-#Generated file - don't edit (please)
-#Thu Mar 11 19:11:12 CST 2021
-ftpserver.user.test.idletime=0
-ftpserver.user.test.maxloginperip=0
-ftpserver.user.test.userpassword=44664D4D827C740293D2AA244FB60445
-ftpserver.user.test.enableflag=true
-ftpserver.user.test.maxloginnumber=0
-ftpserver.user.test.rename.push=true
-ftpserver.user.test.homedirectory=/buckets/test/
-ftpserver.user.test.downloadrate=0
-ftpserver.user.test.writepermission=true
-ftpserver.user.test.uploadrate=0
diff --git a/other/java/hdfs2/README.md b/other/java/hdfs2/README.md
deleted file mode 100644
index e98b06506..000000000
--- a/other/java/hdfs2/README.md
+++ /dev/null
@@ -1,190 +0,0 @@
-# SeaweedFS Hadoop2 Client
-
-Hadoop FileSystem implementation for SeaweedFS, compatible with Hadoop 2.x/3.x.
-
-## Building
-
-```bash
-mvn clean install
-```
-
-## Testing
-
-This project includes two types of tests:
-
-### 1. Configuration Tests (No SeaweedFS Required)
-
-These tests verify configuration handling and initialization logic without requiring a running SeaweedFS instance:
-
-```bash
-mvn test -Dtest=SeaweedFileSystemConfigTest
-```
-
-### 2. Integration Tests (Requires SeaweedFS)
-
-These tests verify actual FileSystem operations against a running SeaweedFS instance.
-
-#### Prerequisites
-
-1. Start SeaweedFS with default ports:
- ```bash
- # Terminal 1: Start master
- weed master
-
- # Terminal 2: Start volume server
- weed volume -mserver=localhost:9333
-
- # Terminal 3: Start filer
- weed filer -master=localhost:9333
- ```
-
-2. Verify services are running:
- - Master: http://localhost:9333
- - Filer HTTP: http://localhost:8888
- - Filer gRPC: localhost:18888
-
-#### Running Integration Tests
-
-```bash
-# Enable integration tests
-export SEAWEEDFS_TEST_ENABLED=true
-
-# Run all tests
-mvn test
-
-# Run specific test
-mvn test -Dtest=SeaweedFileSystemTest
-```
-
-### Test Configuration
-
-Integration tests can be configured via environment variables or system properties:
-
-- `SEAWEEDFS_TEST_ENABLED`: Set to `true` to enable integration tests (default: false)
-- Tests use these default connection settings:
- - Filer Host: localhost
- - Filer HTTP Port: 8888
- - Filer gRPC Port: 18888
-
-### Running Tests with Custom Configuration
-
-To test against a different SeaweedFS instance, modify the test code or use Hadoop configuration:
-
-```java
-conf.set("fs.seaweed.filer.host", "your-host");
-conf.setInt("fs.seaweed.filer.port", 8888);
-conf.setInt("fs.seaweed.filer.port.grpc", 18888);
-```
-
-## Test Coverage
-
-The test suite covers:
-
-- **Configuration & Initialization**
- - URI parsing and configuration
- - Default values
- - Configuration overrides
- - Working directory management
-
-- **File Operations**
- - Create files
- - Read files
- - Write files
- - Append to files
- - Delete files
-
-- **Directory Operations**
- - Create directories
- - List directory contents
- - Delete directories (recursive and non-recursive)
-
-- **Metadata Operations**
- - Get file status
- - Set permissions
- - Set owner/group
- - Rename files and directories
-
-## Usage in Hadoop
-
-1. Copy the built JAR to your Hadoop classpath:
- ```bash
- cp target/seaweedfs-hadoop2-client-*.jar $HADOOP_HOME/share/hadoop/common/lib/
- ```
-
-2. Configure `core-site.xml`:
- ```xml
- <configuration>
- <property>
- <name>fs.seaweedfs.impl</name>
- <value>seaweed.hdfs.SeaweedFileSystem</value>
- </property>
- <property>
- <name>fs.seaweed.filer.host</name>
- <value>localhost</value>
- </property>
- <property>
- <name>fs.seaweed.filer.port</name>
- <value>8888</value>
- </property>
- <property>
- <name>fs.seaweed.filer.port.grpc</name>
- <value>18888</value>
- </property>
- </configuration>
- ```
-
-3. Use SeaweedFS with Hadoop commands:
- ```bash
- hadoop fs -ls seaweedfs://localhost:8888/
- hadoop fs -mkdir seaweedfs://localhost:8888/test
- hadoop fs -put local.txt seaweedfs://localhost:8888/test/
- ```
-
-## Continuous Integration
-
-For CI environments, tests can be run in two modes:
-
-1. **Configuration Tests Only** (default, no SeaweedFS required):
- ```bash
- mvn test -Dtest=SeaweedFileSystemConfigTest
- ```
-
-2. **Full Integration Tests** (requires SeaweedFS):
- ```bash
- # Start SeaweedFS in CI environment
- # Then run:
- export SEAWEEDFS_TEST_ENABLED=true
- mvn test
- ```
-
-## Troubleshooting
-
-### Tests are skipped
-
-If you see "Skipping test - SEAWEEDFS_TEST_ENABLED not set":
-```bash
-export SEAWEEDFS_TEST_ENABLED=true
-```
-
-### Connection refused errors
-
-Ensure SeaweedFS is running and accessible:
-```bash
-curl http://localhost:8888/
-```
-
-### gRPC errors
-
-Verify the gRPC port is accessible:
-```bash
-# Should show the port is listening
-netstat -an | grep 18888
-```
-
-## Contributing
-
-When adding new features, please include:
-1. Configuration tests (no SeaweedFS required)
-2. Integration tests (with SEAWEEDFS_TEST_ENABLED guard)
-3. Documentation updates
-
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
deleted file mode 100644
index fd84befa0..000000000
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ /dev/null
@@ -1,333 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.seaweedfs</groupId>
- <artifactId>seaweedfs-hadoop2-client</artifactId>
- <name>SeaweedFS HDFS2 Client</name>
- <version>${seaweedfs.client.version}</version>
- <description>A java client for SeaweedFS.</description>
- <url>https://github.com/seaweedfs/seaweedfs</url>
- <developers>
- <developer>
- <name>Chris Lu</name>
- <email>chris.lu@gmail.com</email>
- <organization>SeaweedFS</organization>
- <organizationUrl>https://seaweedfs.com</organizationUrl>
- </developer>
- </developers>
- <licenses>
- <license>
- <name>The Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
- <scm>
- <connection>scm:git:git://github.com/seaweedfs/seaweedfs.git</connection>
- <developerConnection>scm:git:ssh://github.com:seaweedfs/seaweedfs.git</developerConnection>
- <url>https://github.com/seaweedfs/seaweedfs/tree/master</url>
- </scm>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- <release>8</release>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- <exclude>org/slf4j/**</exclude>
- <exclude>META-INF/maven/org.slf4j/**</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer />
- </transformers>
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>shaded.com.google</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc.internal</pattern>
- <shadedPattern>shaded.io.grpc.internal</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.commons</pattern>
- <shadedPattern>shaded.org.apache.commons</shadedPattern>
- <excludes>
- <exclude>org.apache.hadoop</exclude>
- <exclude>org.apache.log4j</exclude>
- </excludes>
- </relocation>
- <relocation>
- <pattern>org.apache.http</pattern>
- <shadedPattern>shaded.org.apache.http</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.sonatype.central</groupId>
- <artifactId>central-publishing-maven-plugin</artifactId>
- <version>0.5.0</version>
- <extensions>true</extensions>
- <configuration>
- <publishingServerId>central</publishingServerId>
- <autoPublish>true</autoPublish>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.2.1</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar-no-fork</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>3.0.1</version>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.2.4</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>hadoop-hdfs-client</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-api</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-client</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-annotations</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>3.2.4</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>commons-cli</artifactId>
- <groupId>commons-cli</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-math3</artifactId>
- <groupId>org.apache.commons</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-io</artifactId>
- <groupId>commons-io</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-net</artifactId>
- <groupId>commons-net</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-collections</artifactId>
- <groupId>commons-collections</groupId>
- </exclusion>
- <exclusion>
- <artifactId>javax.servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- <exclusion>
- <artifactId>javax.activation-api</artifactId>
- <groupId>javax.activation</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jetty-server</artifactId>
- <groupId>org.eclipse.jetty</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jetty-util</artifactId>
- <groupId>org.eclipse.jetty</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jetty-servlet</artifactId>
- <groupId>org.eclipse.jetty</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jetty-webapp</artifactId>
- <groupId>org.eclipse.jetty</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jsp-api</artifactId>
- <groupId>javax.servlet.jsp</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-core</artifactId>
- <groupId>com.sun.jersey</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-servlet</artifactId>
- <groupId>com.sun.jersey</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-json</artifactId>
- <groupId>com.sun.jersey</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-server</artifactId>
- <groupId>com.sun.jersey</groupId>
- </exclusion>
- <exclusion>
- <artifactId>reload4j</artifactId>
- <groupId>ch.qos.reload4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-beanutils</artifactId>
- <groupId>commons-beanutils</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-configuration2</artifactId>
- <groupId>org.apache.commons</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-lang3</artifactId>
- <groupId>org.apache.commons</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-text</artifactId>
- <groupId>org.apache.commons</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-reload4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>avro</artifactId>
- <groupId>org.apache.avro</groupId>
- </exclusion>
- <exclusion>
- <artifactId>re2j</artifactId>
- <groupId>com.google.re2j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-auth</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jsch</artifactId>
- <groupId>com.jcraft</groupId>
- </exclusion>
- <exclusion>
- <artifactId>curator-client</artifactId>
- <groupId>org.apache.curator</groupId>
- </exclusion>
- <exclusion>
- <artifactId>curator-recipes</artifactId>
- <groupId>org.apache.curator</groupId>
- </exclusion>
- <exclusion>
- <artifactId>htrace-core4</artifactId>
- <groupId>org.apache.htrace</groupId>
- </exclusion>
- <exclusion>
- <artifactId>zookeeper</artifactId>
- <groupId>org.apache.zookeeper</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-compress</artifactId>
- <groupId>org.apache.commons</groupId>
- </exclusion>
- <exclusion>
- <artifactId>kerb-simplekdc</artifactId>
- <groupId>org.apache.kerby</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jackson-databind</artifactId>
- <groupId>com.fasterxml.jackson.core</groupId>
- </exclusion>
- <exclusion>
- <artifactId>stax2-api</artifactId>
- <groupId>org.codehaus.woodstox</groupId>
- </exclusion>
- <exclusion>
- <artifactId>woodstox-core</artifactId>
- <groupId>com.fasterxml.woodstox</groupId>
- </exclusion>
- <exclusion>
- <artifactId>dnsjava</artifactId>
- <groupId>dnsjava</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-annotations</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <properties>
- <seaweedfs.client.version>3.80</seaweedfs.client.version>
- <hadoop.version>3.2.4</hadoop.version>
- </properties>
-</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
deleted file mode 100644
index 7b4c2507d..000000000
--- a/other/java/hdfs2/pom.xml
+++ /dev/null
@@ -1,195 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <properties>
- <seaweedfs.client.version>3.80</seaweedfs.client.version>
- <hadoop.version>3.4.0</hadoop.version>
- </properties>
-
- <groupId>com.seaweedfs</groupId>
- <artifactId>seaweedfs-hadoop2-client</artifactId>
- <version>${seaweedfs.client.version}</version>
-
- <name>SeaweedFS HDFS2 Client</name>
- <description>A java client for SeaweedFS.</description>
- <url>https://github.com/seaweedfs/seaweedfs</url>
- <licenses>
- <license>
- <name>The Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
- <developers>
- <developer>
- <name>Chris Lu</name>
- <email>chris.lu@gmail.com</email>
- <organization>SeaweedFS</organization>
- <organizationUrl>https://seaweedfs.com</organizationUrl>
- </developer>
- </developers>
- <scm>
- <connection>scm:git:git://github.com/seaweedfs/seaweedfs.git</connection>
- <developerConnection>scm:git:ssh://github.com:seaweedfs/seaweedfs.git</developerConnection>
- <url>https://github.com/seaweedfs/seaweedfs/tree/master</url>
- </scm>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- <release>8</release>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- <exclude>org/slf4j/**</exclude>
- <exclude>META-INF/maven/org.slf4j/**</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- </transformers>
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>shaded.com.google</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc.internal</pattern>
- <shadedPattern>shaded.io.grpc.internal</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.commons</pattern>
- <shadedPattern>shaded.org.apache.commons</shadedPattern>
- <excludes>
- <exclude>org.apache.hadoop</exclude>
- <exclude>org.apache.log4j</exclude>
- </excludes>
- </relocation>
- <relocation>
- <pattern>org.apache.http</pattern>
- <shadedPattern>shaded.org.apache.http</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.sonatype.central</groupId>
- <artifactId>central-publishing-maven-plugin</artifactId>
- <version>0.5.0</version>
- <extensions>true</extensions>
- <configuration>
- <publishingServerId>central</publishingServerId>
- <autoPublish>true</autoPublish>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.2.1</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar-no-fork</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>3.0.1</version>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.seaweedfs</groupId>
- <artifactId>seaweedfs-client</artifactId>
- <version>${seaweedfs.client.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.13.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>3.12.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
deleted file mode 100644
index 3d0b68a52..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.hadoop.fs.*;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
-
- public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
- super(in, size);
- if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
- throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
- }
- }
-
- @Override
- public int read(ByteBuffer buf) throws IOException {
- if (this.in instanceof ByteBufferReadable) {
- return ((ByteBufferReadable)this.in).read(buf);
- } else {
- throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
- }
- }
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
deleted file mode 100644
index e021401aa..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DelegateToFileSystem;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-public class SeaweedAbstractFileSystem extends DelegateToFileSystem {
-
- SeaweedAbstractFileSystem(final URI uri, final Configuration conf)
- throws IOException, URISyntaxException {
- super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false);
- }
-
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
deleted file mode 100644
index 58fcaf975..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ /dev/null
@@ -1,634 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Progressable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerProto;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-
-public class SeaweedFileSystem extends FileSystem {
-
- public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
- public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
- public static final String FS_SEAWEED_FILER_PORT_GRPC = "fs.seaweed.filer.port.grpc";
- public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
- public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
- public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
- public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access";
- public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
- public static final String FS_SEAWEED_FILER_CN = "fs.seaweed.filer.cn";
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
-
- private URI uri;
- private Path workingDirectory = new Path("/");
- private SeaweedFileSystemStore seaweedFileSystemStore;
-
- public URI getUri() {
- return uri;
- }
-
- public String getScheme() {
- return "seaweedfs";
- }
-
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException { // get
- super.initialize(uri, conf);
-
- // get host information from uri (overrides info in conf)
- String host = uri.getHost();
- host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host;
- conf.set(FS_SEAWEED_FILER_HOST, host);
-
- // get port information from uri, (overrides info in conf)
- int port = uri.getPort();
- port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
- conf.setInt(FS_SEAWEED_FILER_PORT, port);
-
- int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000);
-
- setConf(conf);
- this.uri = uri;
-
- String cn = conf.get(FS_SEAWEED_FILER_CN, "");
-
- seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, grpcPort, cn, conf);
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- this.seaweedFileSystemStore.close();
- }
-
- @Override
- public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-
- LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
-
- path = qualify(path);
-
- try {
- int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
- return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
- } catch (Exception ex) {
- LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
- return null;
- }
- }
-
- @Override
- public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress) throws IOException {
-
- LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
-
- path = qualify(path);
-
- try {
- String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1));
- int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
- return new FSDataOutputStream(outputStream, statistics);
- } catch (Exception ex) {
- LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
- return null;
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws FileNotFoundException if the parent directory is not present -or
- * is not a directory.
- */
- @Override
- public FSDataOutputStream createNonRecursive(Path path,
- FsPermission permission,
- EnumSet<CreateFlag> flags,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress) throws IOException {
- Path parent = path.getParent();
- if (parent != null) {
- // expect this to raise an exception if there is no parent
- if (!getFileStatus(parent).isDirectory()) {
- throw new FileAlreadyExistsException("Not a directory: " + parent);
- }
- }
- int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- return create(path, permission,
- flags.contains(CreateFlag.OVERWRITE), bufferSize,
- replication, seaweedBufferSize, progress);
- }
-
- @Override
- public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
-
- LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
-
- path = qualify(path);
- try {
- int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
- return new FSDataOutputStream(outputStream, statistics);
- } catch (Exception ex) {
- LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
- return null;
- }
- }
-
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
-
- LOG.debug("rename path: {} => {}", src, dst);
-
- if (src.isRoot()) {
- return false;
- }
-
- if (src.equals(dst)) {
- return true;
- }
- FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst);
-
- Path adjustedDst = dst;
-
- if (entry != null) {
- FileStatus dstFileStatus = getFileStatus(dst);
- String sourceFileName = src.getName();
- if (!dstFileStatus.isDirectory()) {
- return false;
- }
- adjustedDst = new Path(dst, sourceFileName);
- }
-
- Path qualifiedSrcPath = qualify(src);
- Path qualifiedDstPath = qualify(adjustedDst);
-
- seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath);
- return true;
- }
-
- @Override
- public boolean delete(Path path, boolean recursive) throws IOException {
-
- LOG.debug("delete path: {} recursive:{}", path, recursive);
-
- path = qualify(path);
-
- FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
-
- if (entry == null) {
- return true;
- }
-
- FileStatus fileStatus = getFileStatus(path);
-
- return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
-
- }
-
- @Override
- public FileStatus[] listStatus(Path path) throws IOException {
-
- LOG.debug("listStatus path: {}", path);
-
- path = qualify(path);
-
- return seaweedFileSystemStore.listEntries(path);
- }
-
- @Override
- public Path getWorkingDirectory() {
- return workingDirectory;
- }
-
- @Override
- public void setWorkingDirectory(Path path) {
- if (path.isAbsolute()) {
- workingDirectory = path;
- } else {
- workingDirectory = new Path(workingDirectory, path);
- }
- }
-
- @Override
- public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
-
- LOG.debug("mkdirs path: {}", path);
-
- path = qualify(path);
-
- FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
-
- if (entry == null) {
-
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- return seaweedFileSystemStore.createDirectory(path, currentUser,
- fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
- FsPermission.getUMask(getConf()));
-
- }
-
- FileStatus fileStatus = getFileStatus(path);
-
- if (fileStatus.isDirectory()) {
- return true;
- } else {
- throw new FileAlreadyExistsException("Path is a file: " + path);
- }
- }
-
- @Override
- public FileStatus getFileStatus(Path path) throws IOException {
-
- LOG.debug("getFileStatus path: {}", path);
-
- path = qualify(path);
-
- return seaweedFileSystemStore.getFileStatus(path);
- }
-
- /**
- * Set owner of a path (i.e. a file or a directory).
- * The parameters owner and group cannot both be null.
- *
- * @param path The path
- * @param owner If it is null, the original username remains unchanged.
- * @param group If it is null, the original groupname remains unchanged.
- */
- @Override
- public void setOwner(Path path, final String owner, final String group)
- throws IOException {
- LOG.debug("setOwner path: {}", path);
- path = qualify(path);
-
- seaweedFileSystemStore.setOwner(path, owner, group);
- }
-
-
- /**
- * Set permission of a path.
- *
- * @param path The path
- * @param permission Access permission
- */
- @Override
- public void setPermission(Path path, final FsPermission permission) throws IOException {
- LOG.debug("setPermission path: {}", path);
-
- if (permission == null) {
- throw new IllegalArgumentException("The permission can't be null");
- }
-
- path = qualify(path);
-
- seaweedFileSystemStore.setPermission(path, permission);
- }
-
- Path qualify(Path path) {
- return path.makeQualified(uri, workingDirectory);
- }
-
- /**
- * Concat existing files together.
- *
- * @param trg the path to the target destination.
- * @param psrcs the paths to the sources to use for the concatenation.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default).
- */
- @Override
- public void concat(final Path trg, final Path[] psrcs) throws IOException {
- throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
- }
-
- /**
- * Truncate the file in the indicated path to the indicated size.
- * <ul>
- * <li>Fails if path is a directory.</li>
- * <li>Fails if path does not exist.</li>
- * <li>Fails if path is not closed.</li>
- * <li>Fails if new size is greater than current size.</li>
- * </ul>
- *
- * @param f The path to the file to be truncated
- * @param newLength The size the file is to be truncated to
- * @return <code>true</code> if the file has been truncated to the desired
- * <code>newLength</code> and is immediately available to be reused for
- * write operations such as <code>append</code>, or
- * <code>false</code> if a background process of adjusting the length of
- * the last block has been started, and clients should wait for it to
- * complete before proceeding with further file updates.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default).
- */
- @Override
- public boolean truncate(Path f, long newLength) throws IOException {
- throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
- }
-
- @Override
- public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws
- IOException {
- // Supporting filesystems should override this method
- throw new UnsupportedOperationException(
- "Filesystem does not support symlinks!");
- }
-
- public boolean supportsSymlinks() {
- return false;
- }
-
- /**
- * Create a snapshot.
- *
- * @param path The directory where snapshots will be taken.
- * @param snapshotName The name of the snapshot
- * @return the snapshot path.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- */
- @Override
- public Path createSnapshot(Path path, String snapshotName)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support createSnapshot");
- }
-
- /**
- * Rename a snapshot.
- *
- * @param path The directory path where the snapshot was taken
- * @param snapshotOldName Old name of the snapshot
- * @param snapshotNewName New name of the snapshot
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void renameSnapshot(Path path, String snapshotOldName,
- String snapshotNewName) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support renameSnapshot");
- }
-
- /**
- * Delete a snapshot of a directory.
- *
- * @param path The directory that the to-be-deleted snapshot belongs to
- * @param snapshotName The name of the snapshot
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void deleteSnapshot(Path path, String snapshotName)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support deleteSnapshot");
- }
-
- /**
- * Modifies ACL entries of files and directories. This method can add new ACL
- * entries or modify the permissions on existing ACL entries. All existing
- * ACL entries that are not specified in this call are retained without
- * changes. (Modifications are merged into the current ACL.)
- *
- * @param path Path to modify
- * @param aclSpec List&lt;AclEntry&gt; describing modifications
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support modifyAclEntries");
- }
-
- /**
- * Removes ACL entries from files and directories. Other ACL entries are
- * retained.
- *
- * @param path Path to modify
- * @param aclSpec List describing entries to remove
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAclEntries");
- }
-
- /**
- * Removes all default ACL entries from files and directories.
- *
- * @param path Path to modify
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeDefaultAcl(Path path)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeDefaultAcl");
- }
-
- /**
- * Removes all but the base ACL entries of files and directories. The entries
- * for user, group, and others are retained for compatibility with permission
- * bits.
- *
- * @param path Path to modify
- * @throws IOException if an ACL could not be removed
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeAcl(Path path)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAcl");
- }
-
- /**
- * Fully replaces ACL of files and directories, discarding all existing
- * entries.
- *
- * @param path Path to modify
- * @param aclSpec List describing modifications, which must include entries
- * for user, group, and others for compatibility with permission bits.
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setAcl");
- }
-
- /**
- * Gets the ACL of a file or directory.
- *
- * @param path Path to get
- * @return AclStatus describing the ACL of the file or directory
- * @throws IOException if an ACL could not be read
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public AclStatus getAclStatus(Path path) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getAclStatus");
- }
-
- /**
- * Set an xattr of a file or directory.
- * The name must be prefixed with the namespace followed by ".". For example,
- * "user.attr".
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to modify
- * @param name xattr name.
- * @param value xattr value.
- * @param flag xattr set flag
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void setXAttr(Path path, String name, byte[] value,
- EnumSet<XAttrSetFlag> flag) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setXAttr");
- }
-
- /**
- * Get an xattr name and value for a file or directory.
- * The name must be prefixed with the namespace followed by ".". For example,
- * "user.attr".
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attribute
- * @param name xattr name.
- * @return byte[] xattr value.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public byte[] getXAttr(Path path, String name) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttr");
- }
-
- /**
- * Get all of the xattr name/value pairs for a file or directory.
- * Only those xattrs which the logged-in user has permissions to view
- * are returned.
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attributes
- * @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public Map<String, byte[]> getXAttrs(Path path) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
- }
-
- /**
- * Get all of the xattrs name/value pairs for a file or directory.
- * Only those xattrs which the logged-in user has permissions to view
- * are returned.
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attributes
- * @param names XAttr names.
- * @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public Map<String, byte[]> getXAttrs(Path path, List<String> names)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
- }
-
- /**
- * Get all of the xattr names for a file or directory.
- * Only those xattr names which the logged-in user has permissions to view
- * are returned.
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attributes
- * @return List{@literal <String>} of the XAttr names of the file or directory
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public List<String> listXAttrs(Path path) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support listXAttrs");
- }
-
- /**
- * Remove an xattr of a file or directory.
- * The name must be prefixed with the namespace followed by ".". For example,
- * "user.attr".
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to remove extended attribute
- * @param name xattr name
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeXAttr(Path path, String name) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeXAttr");
- }
-
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
deleted file mode 100644
index f65c1961b..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ /dev/null
@@ -1,291 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.*;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static seaweed.hdfs.SeaweedFileSystem.*;
-
-public class SeaweedFileSystemStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
-
- private FilerClient filerClient;
- private Configuration conf;
-
- public SeaweedFileSystemStore(String host, int port, int grpcPort, String cn, Configuration conf) {
- filerClient = new FilerClient(host, port, grpcPort, cn);
- this.conf = conf;
- String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
- if (volumeServerAccessMode.equals("publicUrl")) {
- filerClient.setAccessVolumeServerByPublicUrl();
- } else if (volumeServerAccessMode.equals("filerProxy")) {
- filerClient.setAccessVolumeServerByFilerProxy();
- }
- }
-
- public void close() {
- try {
- this.filerClient.shutdown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public static String getParentDirectory(Path path) {
- return path.isRoot() ? "/" : path.getParent().toUri().getPath();
- }
-
- static int permissionToMode(FsPermission permission, boolean isDirectory) {
- int p = permission.toShort();
- if (isDirectory) {
- p = p | 1 << 31;
- }
- return p;
- }
-
- public boolean createDirectory(final Path path, UserGroupInformation currentUser,
- final FsPermission permission, final FsPermission umask) {
-
- LOG.debug("createDirectory path: {} permission: {} umask: {}",
- path,
- permission,
- umask);
-
- return filerClient.mkdirs(
- path.toUri().getPath(),
- permissionToMode(permission, true),
- currentUser.getUserName(),
- currentUser.getGroupNames()
- );
- }
-
- public FileStatus[] listEntries(final Path path) throws IOException {
- LOG.debug("listEntries path: {}", path);
-
- FileStatus pathStatus = getFileStatus(path);
-
- if (pathStatus == null) {
- return new FileStatus[0];
- }
-
- if (!pathStatus.isDirectory()) {
- return new FileStatus[]{pathStatus};
- }
-
- List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
-
- List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
-
- for (FilerProto.Entry entry : entries) {
-
- FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry);
-
- fileStatuses.add(fileStatus);
- }
- LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size());
- return fileStatuses.toArray(new FileStatus[0]);
-
- }
-
- public FileStatus getFileStatus(final Path path) throws IOException {
-
- FilerProto.Entry entry = lookupEntry(path);
- if (entry == null) {
- throw new FileNotFoundException("File does not exist: " + path);
- }
- LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
-
- FileStatus fileStatus = doGetFileStatus(path, entry);
- return fileStatus;
- }
-
- public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
- LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
- path,
- String.valueOf(isDirectory),
- String.valueOf(recursive));
-
- if (path.isRoot()) {
- return true;
- }
-
- if (recursive && isDirectory) {
- List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
- for (FilerProto.Entry entry : entries) {
- deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true);
- }
- }
-
- return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive, true);
- }
-
- private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
- FilerProto.FuseAttributes attributes = entry.getAttributes();
- long length = SeaweedRead.fileSize(entry);
- boolean isDir = entry.getIsDirectory();
- int block_replication = 1;
- int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- long modification_time = attributes.getMtime() * 1000; // milliseconds
- long access_time = 0;
- FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
- String owner = attributes.getUserName();
- String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
- return new FileStatus(length, isDir, block_replication, blocksize,
- modification_time, access_time, permission, owner, group, null, path);
- }
-
- public FilerProto.Entry lookupEntry(Path path) {
-
- return filerClient.lookupEntry(getParentDirectory(path), path.getName());
-
- }
-
- public void rename(Path source, Path destination) {
-
- LOG.debug("rename source: {} destination:{}", source, destination);
-
- if (source.isRoot()) {
- return;
- }
- LOG.info("rename source: {} destination:{}", source, destination);
- FilerProto.Entry entry = lookupEntry(source);
- if (entry == null) {
- LOG.warn("rename non-existing source: {}", source);
- return;
- }
- filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
- }
-
- public OutputStream createFile(final Path path,
- final boolean overwrite,
- FsPermission permission,
- int bufferSize,
- String replication) throws IOException {
-
- permission = permission == null ? FsPermission.getFileDefault() : permission;
-
- LOG.debug("createFile path: {} overwrite: {} permission: {}",
- path,
- overwrite,
- permission.toString());
-
- UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
- long now = System.currentTimeMillis() / 1000L;
-
- FilerProto.Entry.Builder entry = null;
- long writePosition = 0;
- if (!overwrite) {
- FilerProto.Entry existingEntry = lookupEntry(path);
- LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry);
- if (existingEntry != null) {
- entry = FilerProto.Entry.newBuilder();
- entry.mergeFrom(existingEntry);
- entry.clearContent();
- entry.getAttributesBuilder().setMtime(now);
- LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
- writePosition = SeaweedRead.fileSize(existingEntry);
- }
- }
- if (entry == null) {
- entry = FilerProto.Entry.newBuilder()
- .setName(path.getName())
- .setIsDirectory(false)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setFileMode(permissionToMode(permission, false))
- .setCrtime(now)
- .setMtime(now)
- .setUserName(userGroupInformation.getUserName())
- .clearGroupName()
- .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
- );
- SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
- }
-
- return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
-
- }
-
- public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException {
-
- LOG.debug("openFileForRead path:{}", path);
-
- FilerProto.Entry entry = lookupEntry(path);
-
- if (entry == null) {
- throw new FileNotFoundException("read non-exist file " + path);
- }
-
- return new SeaweedHadoopInputStream(filerClient,
- statistics,
- path.toUri().getPath(),
- entry);
- }
-
- public void setOwner(Path path, String owner, String group) {
-
- LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group);
-
- FilerProto.Entry entry = lookupEntry(path);
- if (entry == null) {
- LOG.debug("setOwner path:{} entry:{}", path, entry);
- return;
- }
-
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
-
- if (owner != null) {
- attributesBuilder.setUserName(owner);
- }
- if (group != null) {
- attributesBuilder.clearGroupName();
- attributesBuilder.addGroupName(group);
- }
-
- entryBuilder.setAttributes(attributesBuilder);
-
- LOG.debug("setOwner path:{} entry:{}", path, entryBuilder);
-
- filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
-
- }
-
- public void setPermission(Path path, FsPermission permission) {
-
- LOG.debug("setPermission path:{} permission:{}", path, permission);
-
- FilerProto.Entry entry = lookupEntry(path);
- if (entry == null) {
- LOG.debug("setPermission path:{} entry:{}", path, entry);
- return;
- }
-
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
-
- attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory()));
-
- entryBuilder.setAttributes(attributesBuilder);
-
- LOG.debug("setPermission path:{} entry:{}", path, entryBuilder);
-
- filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
-
- }
-
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
deleted file mode 100644
index f26eae597..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package seaweed.hdfs;
-
-// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedInputStream;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
-
- private final SeaweedInputStream seaweedInputStream;
- private final Statistics statistics;
-
- public SeaweedHadoopInputStream(
- final FilerClient filerClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry) throws IOException {
- this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
- this.statistics = statistics;
- }
-
- @Override
- public int read() throws IOException {
- return seaweedInputStream.read();
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- return seaweedInputStream.read(b, off, len);
- }
-
- // implement ByteBufferReadable
- @Override
- public synchronized int read(ByteBuffer buf) throws IOException {
- int bytesRead = seaweedInputStream.read(buf);
-
- if (bytesRead > 0) {
- if (statistics != null) {
- statistics.incrementBytesRead(bytesRead);
- }
- }
-
- return bytesRead;
- }
-
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
- public synchronized void seek(long n) throws IOException {
- seaweedInputStream.seek(n);
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- return seaweedInputStream.skip(n);
- }
-
- /**
- * Return the size of the remaining available bytes
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
- * otherwise, return {@link Integer#MAX_VALUE}.
- * <p>
- * This is to match the behavior of DFSInputStream.available(),
- * which some clients may rely on (HBase write-ahead log reading in
- * particular).
- */
- @Override
- public synchronized int available() throws IOException {
- return seaweedInputStream.available();
- }
-
- /**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
- * they wont be reflected in the returned length.
- *
- * @return length of the file.
- * @throws IOException if the stream is closed
- */
- public long length() throws IOException {
- return seaweedInputStream.length();
- }
-
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public synchronized long getPos() throws IOException {
- return seaweedInputStream.getPos();
- }
-
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
- @Override
- public synchronized void close() throws IOException {
- seaweedInputStream.close();
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
deleted file mode 100644
index da5b56bbc..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package seaweed.hdfs;
-
-// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-
-import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedOutputStream;
-
-public class SeaweedHadoopOutputStream extends SeaweedOutputStream {
-
- public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
- super(filerClient, path, entry, position, bufferSize, replication);
- }
-
-}
diff --git a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java b/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java
deleted file mode 100644
index bcc08b8e2..000000000
--- a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- * Unit tests for SeaweedFileSystem configuration that don't require a running SeaweedFS instance.
- *
- * These tests verify basic properties and constants.
- */
-public class SeaweedFileSystemConfigTest {
-
- private SeaweedFileSystem fs;
- private Configuration conf;
-
- @Before
- public void setUp() {
- fs = new SeaweedFileSystem();
- conf = new Configuration();
- }
-
- @Test
- public void testScheme() {
- assertEquals("seaweedfs", fs.getScheme());
- }
-
- @Test
- public void testConstants() {
- // Test that constants are defined correctly
- assertEquals("fs.seaweed.filer.host", SeaweedFileSystem.FS_SEAWEED_FILER_HOST);
- assertEquals("fs.seaweed.filer.port", SeaweedFileSystem.FS_SEAWEED_FILER_PORT);
- assertEquals("fs.seaweed.filer.port.grpc", SeaweedFileSystem.FS_SEAWEED_FILER_PORT_GRPC);
- assertEquals(8888, SeaweedFileSystem.FS_SEAWEED_DEFAULT_PORT);
- assertEquals("fs.seaweed.buffer.size", SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE);
- assertEquals(4 * 1024 * 1024, SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- assertEquals("fs.seaweed.replication", SeaweedFileSystem.FS_SEAWEED_REPLICATION);
- assertEquals("fs.seaweed.volume.server.access", SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS);
- assertEquals("fs.seaweed.filer.cn", SeaweedFileSystem.FS_SEAWEED_FILER_CN);
- }
-
- @Test
- public void testWorkingDirectoryPathOperations() {
- // Test path operations that don't require initialization
- Path testPath = new Path("/test/path");
- assertTrue("Path should be absolute", testPath.isAbsolute());
- assertEquals("/test/path", testPath.toUri().getPath());
-
- Path childPath = new Path(testPath, "child");
- assertEquals("/test/path/child", childPath.toUri().getPath());
- }
-
- @Test
- public void testConfigurationProperties() {
- // Test that configuration can be set and read
- conf.set(SeaweedFileSystem.FS_SEAWEED_FILER_HOST, "testhost");
- assertEquals("testhost", conf.get(SeaweedFileSystem.FS_SEAWEED_FILER_HOST));
-
- conf.setInt(SeaweedFileSystem.FS_SEAWEED_FILER_PORT, 9999);
- assertEquals(9999, conf.getInt(SeaweedFileSystem.FS_SEAWEED_FILER_PORT, 0));
-
- conf.setInt(SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE, 8 * 1024 * 1024);
- assertEquals(8 * 1024 * 1024, conf.getInt(SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE, 0));
-
- conf.set(SeaweedFileSystem.FS_SEAWEED_REPLICATION, "001");
- assertEquals("001", conf.get(SeaweedFileSystem.FS_SEAWEED_REPLICATION));
-
- conf.set(SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS, "publicUrl");
- assertEquals("publicUrl", conf.get(SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS));
-
- conf.set(SeaweedFileSystem.FS_SEAWEED_FILER_CN, "test-cn");
- assertEquals("test-cn", conf.get(SeaweedFileSystem.FS_SEAWEED_FILER_CN));
- }
-
- @Test
- public void testDefaultBufferSize() {
- // Test default buffer size constant
- int expected = 4 * 1024 * 1024; // 4MB
- assertEquals(expected, SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- }
-
- @Test
- public void testDefaultPort() {
- // Test default port constant
- assertEquals(8888, SeaweedFileSystem.FS_SEAWEED_DEFAULT_PORT);
- }
-}
diff --git a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java b/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java
deleted file mode 100644
index ec43b3481..000000000
--- a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.URI;
-
-import static org.junit.Assert.*;
-
-/**
- * Unit tests for SeaweedFileSystem.
- *
- * These tests verify basic FileSystem operations against a SeaweedFS backend.
- * Note: These tests require a running SeaweedFS filer instance.
- *
- * To run tests, ensure SeaweedFS is running with default ports:
- * - Filer HTTP: 8888
- * - Filer gRPC: 18888
- *
- * Set environment variable SEAWEEDFS_TEST_ENABLED=true to enable these tests.
- */
-public class SeaweedFileSystemTest {
-
- private SeaweedFileSystem fs;
- private Configuration conf;
- private static final String TEST_ROOT = "/test-hdfs2";
- private static final boolean TESTS_ENABLED =
- "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
-
- @Before
- public void setUp() throws Exception {
- if (!TESTS_ENABLED) {
- return;
- }
-
- conf = new Configuration();
- conf.set("fs.seaweed.filer.host", "localhost");
- conf.setInt("fs.seaweed.filer.port", 8888);
- conf.setInt("fs.seaweed.filer.port.grpc", 18888);
-
- fs = new SeaweedFileSystem();
- URI uri = new URI("seaweedfs://localhost:8888/");
- fs.initialize(uri, conf);
-
- // Clean up any existing test directory
- Path testPath = new Path(TEST_ROOT);
- if (fs.exists(testPath)) {
- fs.delete(testPath, true);
- }
- }
-
- @After
- public void tearDown() throws Exception {
- if (!TESTS_ENABLED || fs == null) {
- return;
- }
-
- // Clean up test directory
- Path testPath = new Path(TEST_ROOT);
- if (fs.exists(testPath)) {
- fs.delete(testPath, true);
- }
-
- fs.close();
- }
-
- @Test
- public void testInitialization() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- assertNotNull(fs);
- assertEquals("seaweedfs", fs.getScheme());
- assertNotNull(fs.getUri());
- assertEquals("/", fs.getWorkingDirectory().toUri().getPath());
- }
-
- @Test
- public void testMkdirs() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testDir = new Path(TEST_ROOT + "/testdir");
- assertTrue("Failed to create directory", fs.mkdirs(testDir));
- assertTrue("Directory should exist", fs.exists(testDir));
-
- FileStatus status = fs.getFileStatus(testDir);
- assertTrue("Path should be a directory", status.isDirectory());
- }
-
- @Test
- public void testCreateAndReadFile() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testFile = new Path(TEST_ROOT + "/testfile.txt");
- String testContent = "Hello, SeaweedFS!";
-
- // Create and write to file
- FSDataOutputStream out = fs.create(testFile, FsPermission.getDefault(),
- false, 4096, (short) 1, 4 * 1024 * 1024, null);
- assertNotNull("Output stream should not be null", out);
- out.write(testContent.getBytes());
- out.close();
-
- // Verify file exists
- assertTrue("File should exist", fs.exists(testFile));
-
- // Read and verify content
- FSDataInputStream in = fs.open(testFile, 4096);
- assertNotNull("Input stream should not be null", in);
- byte[] buffer = new byte[testContent.length()];
- int bytesRead = in.read(buffer);
- in.close();
-
- assertEquals("Should read all bytes", testContent.length(), bytesRead);
- assertEquals("Content should match", testContent, new String(buffer));
- }
-
- @Test
- public void testFileStatus() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testFile = new Path(TEST_ROOT + "/statustest.txt");
- String content = "test content";
-
- FSDataOutputStream out = fs.create(testFile);
- out.write(content.getBytes());
- out.close();
-
- FileStatus status = fs.getFileStatus(testFile);
- assertNotNull("FileStatus should not be null", status);
- assertFalse("Should not be a directory", status.isDirectory());
- assertTrue("Should be a file", status.isFile());
- assertEquals("File length should match", content.length(), status.getLen());
- assertNotNull("Path should not be null", status.getPath());
- }
-
- @Test
- public void testListStatus() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testDir = new Path(TEST_ROOT + "/listtest");
- fs.mkdirs(testDir);
-
- // Create multiple files
- for (int i = 0; i < 3; i++) {
- Path file = new Path(testDir, "file" + i + ".txt");
- FSDataOutputStream out = fs.create(file);
- out.write(("content" + i).getBytes());
- out.close();
- }
-
- FileStatus[] statuses = fs.listStatus(testDir);
- assertNotNull("List should not be null", statuses);
- assertEquals("Should have 3 files", 3, statuses.length);
- }
-
- @Test
- public void testRename() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path srcFile = new Path(TEST_ROOT + "/source.txt");
- Path dstFile = new Path(TEST_ROOT + "/destination.txt");
- String content = "rename test";
-
- // Create source file
- FSDataOutputStream out = fs.create(srcFile);
- out.write(content.getBytes());
- out.close();
-
- assertTrue("Source file should exist", fs.exists(srcFile));
-
- // Rename
- assertTrue("Rename should succeed", fs.rename(srcFile, dstFile));
-
- // Verify
- assertFalse("Source file should not exist", fs.exists(srcFile));
- assertTrue("Destination file should exist", fs.exists(dstFile));
-
- // Verify content preserved
- FSDataInputStream in = fs.open(dstFile);
- byte[] buffer = new byte[content.length()];
- in.read(buffer);
- in.close();
- assertEquals("Content should be preserved", content, new String(buffer));
- }
-
- @Test
- public void testDelete() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testFile = new Path(TEST_ROOT + "/deletetest.txt");
-
- // Create file
- FSDataOutputStream out = fs.create(testFile);
- out.write("delete me".getBytes());
- out.close();
-
- assertTrue("File should exist before delete", fs.exists(testFile));
-
- // Delete
- assertTrue("Delete should succeed", fs.delete(testFile, false));
- assertFalse("File should not exist after delete", fs.exists(testFile));
- }
-
- @Test
- public void testDeleteDirectory() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testDir = new Path(TEST_ROOT + "/deletedir");
- Path testFile = new Path(testDir, "file.txt");
-
- // Create directory with file
- fs.mkdirs(testDir);
- FSDataOutputStream out = fs.create(testFile);
- out.write("content".getBytes());
- out.close();
-
- assertTrue("Directory should exist", fs.exists(testDir));
- assertTrue("File should exist", fs.exists(testFile));
-
- // Recursive delete
- assertTrue("Recursive delete should succeed", fs.delete(testDir, true));
- assertFalse("Directory should not exist after delete", fs.exists(testDir));
- assertFalse("File should not exist after delete", fs.exists(testFile));
- }
-
- @Test
- public void testAppend() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testFile = new Path(TEST_ROOT + "/appendtest.txt");
- String initialContent = "initial";
- String appendContent = " appended";
-
- // Create initial file
- FSDataOutputStream out = fs.create(testFile);
- out.write(initialContent.getBytes());
- out.close();
-
- // Append
- FSDataOutputStream appendOut = fs.append(testFile, 4096, null);
- assertNotNull("Append stream should not be null", appendOut);
- appendOut.write(appendContent.getBytes());
- appendOut.close();
-
- // Verify combined content
- FSDataInputStream in = fs.open(testFile);
- byte[] buffer = new byte[initialContent.length() + appendContent.length()];
- int bytesRead = in.read(buffer);
- in.close();
-
- String expected = initialContent + appendContent;
- assertEquals("Should read all bytes", expected.length(), bytesRead);
- assertEquals("Content should match", expected, new String(buffer));
- }
-
- @Test
- public void testSetWorkingDirectory() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path originalWd = fs.getWorkingDirectory();
- assertEquals("Original working directory should be /", "/", originalWd.toUri().getPath());
-
- Path newWd = new Path(TEST_ROOT);
- fs.mkdirs(newWd);
- fs.setWorkingDirectory(newWd);
-
- Path currentWd = fs.getWorkingDirectory();
- assertTrue("Working directory should be updated",
- currentWd.toUri().getPath().contains(TEST_ROOT));
- }
-
- @Test
- public void testSetPermission() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testFile = new Path(TEST_ROOT + "/permtest.txt");
-
- // Create file
- FSDataOutputStream out = fs.create(testFile);
- out.write("permission test".getBytes());
- out.close();
-
- // Set permission
- FsPermission newPerm = new FsPermission((short) 0644);
- fs.setPermission(testFile, newPerm);
-
- FileStatus status = fs.getFileStatus(testFile);
- assertNotNull("Permission should not be null", status.getPermission());
- }
-
- @Test
- public void testSetOwner() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path testFile = new Path(TEST_ROOT + "/ownertest.txt");
-
- // Create file
- FSDataOutputStream out = fs.create(testFile);
- out.write("owner test".getBytes());
- out.close();
-
- // Set owner - this may not fail even if not fully implemented
- fs.setOwner(testFile, "testuser", "testgroup");
-
- // Just verify the call doesn't throw an exception
- FileStatus status = fs.getFileStatus(testFile);
- assertNotNull("FileStatus should not be null", status);
- }
-
- @Test
- public void testRenameToExistingDirectory() throws Exception {
- if (!TESTS_ENABLED) {
- System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
- return;
- }
-
- Path srcFile = new Path(TEST_ROOT + "/movefile.txt");
- Path dstDir = new Path(TEST_ROOT + "/movedir");
-
- // Create source file and destination directory
- FSDataOutputStream out = fs.create(srcFile);
- out.write("move test".getBytes());
- out.close();
- fs.mkdirs(dstDir);
-
- // Rename file to existing directory (should move file into directory)
- assertTrue("Rename to directory should succeed", fs.rename(srcFile, dstDir));
-
- // File should be moved into the directory
- Path expectedLocation = new Path(dstDir, srcFile.getName());
- assertTrue("File should exist in destination directory", fs.exists(expectedLocation));
- assertFalse("Source file should not exist", fs.exists(srcFile));
- }
-}
-
diff --git a/other/java/hdfs3/README.md b/other/java/hdfs3/README.md
index f1afee264..e08f02a7c 100644
--- a/other/java/hdfs3/README.md
+++ b/other/java/hdfs3/README.md
@@ -130,6 +130,15 @@ The test suite covers:
<name>fs.seaweed.filer.port.grpc</name>
<value>18888</value>
</property>
+ <!-- Optional: Replication configuration with three priority levels:
+ 1) If set to non-empty value (e.g. "001") - uses that value
+ 2) If set to empty string "" - uses SeaweedFS filer's default replication
+ 3) If not configured (property not present) - uses HDFS replication parameter
+ -->
+ <!-- <property>
+ <name>fs.seaweed.replication</name>
+ <value>001</value>
+ </property> -->
</configuration>
```
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index d3c2751a5..c6579c3fb 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -572,7 +572,7 @@
</dependency>
</dependencies>
<properties>
- <seaweedfs.client.version>3.80</seaweedfs.client.version>
+ <seaweedfs.client.version>3.80.1-SNAPSHOT</seaweedfs.client.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index 061d4d700..824db8264 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>3.80</seaweedfs.client.version>
+ <seaweedfs.client.version>3.80.1-SNAPSHOT</seaweedfs.client.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 58fcaf975..513266d69 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -59,7 +59,7 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
- int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000);
+ int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port + 10000);
setConf(conf);
this.uri = uri;
@@ -85,29 +85,45 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
- return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
+
+ // Use BufferedFSInputStream for all streams (like RawLocalFileSystem)
+ // This ensures proper position tracking for positioned reads (critical for
+ // Parquet)
+ return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
- LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
- return null;
+ LOG.error("Failed to open file: {} bufferSize:{}", path, bufferSize, ex);
+ throw new IOException("Failed to open file: " + path, ex);
}
}
@Override
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress) throws IOException {
+ final short replication, final long blockSize, final Progressable progress) throws IOException {
LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
path = qualify(path);
+ final Path finalPath = path; // For use in anonymous inner class
try {
- String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1));
+ // Priority: 1) non-empty FS_SEAWEED_REPLICATION, 2) empty string -> filer
+ // default, 3) null -> HDFS replication
+ String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION);
+ if (replicaPlacement == null) {
+ // Not configured, use HDFS replication parameter. This creates a "00N"
+ // replication string,
+ // placing N (replication-1) extra replicas on different servers in the same
+ // rack.
+ replicaPlacement = String.format("%03d", replication - 1);
+ }
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path,
+ overwrite, permission,
+ seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
- LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
- return null;
+ LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
+ throw new IOException("Failed to create file: " + path, ex);
}
}
@@ -119,12 +135,12 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
- FsPermission permission,
- EnumSet<CreateFlag> flags,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress) throws IOException {
+ FsPermission permission,
+ EnumSet<CreateFlag> flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
@@ -144,13 +160,15 @@ public class SeaweedFileSystem extends FileSystem {
LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
path = qualify(path);
+ final Path finalPath = path; // For use in anonymous inner class
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, "");
+ SeaweedHadoopOutputStream outputStream = (SeaweedHadoopOutputStream) seaweedFileSystemStore.createFile(path,
+ false, null, seaweedBufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
- LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex);
- return null;
+ LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex);
+ throw new IOException("Failed to append to file: " + path, ex);
}
}
@@ -283,7 +301,6 @@ public class SeaweedFileSystem extends FileSystem {
seaweedFileSystemStore.setOwner(path, owner, group);
}
-
/**
* Set permission of a path.
*
@@ -334,11 +351,11 @@ public class SeaweedFileSystem extends FileSystem {
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
* @return <code>true</code> if the file has been truncated to the desired
- * <code>newLength</code> and is immediately available to be reused for
- * write operations such as <code>append</code>, or
- * <code>false</code> if a background process of adjusting the length of
- * the last block has been started, and clients should wait for it to
- * complete before proceeding with further file updates.
+ * <code>newLength</code> and is immediately available to be reused for
+ * write operations such as <code>append</code>, or
+ * <code>false</code> if a background process of adjusting the length of
+ * the last block has been started, and clients should wait for it to
+ * complete before proceeding with further file updates.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
@@ -351,8 +368,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws
- IOException {
+ final boolean createParent) throws IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
@@ -390,7 +406,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
- String snapshotNewName) throws IOException {
+ String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support renameSnapshot");
}
@@ -412,10 +428,10 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
- * Modifies ACL entries of files and directories. This method can add new ACL
- * entries or modify the permissions on existing ACL entries. All existing
+ * Modifies ACL entries of files and directories. This method can add new ACL
+ * entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
- * changes. (Modifications are merged into the current ACL.)
+ * changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
@@ -431,7 +447,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
- * Removes ACL entries from files and directories. Other ACL entries are
+ * Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
@@ -463,7 +479,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
- * Removes all but the base ACL entries of files and directories. The entries
+ * Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
@@ -485,7 +501,8 @@ public class SeaweedFileSystem extends FileSystem {
*
* @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
- * for user, group, and others for compatibility with permission bits.
+ * for user, group, and others for compatibility with permission
+ * bits.
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
@@ -528,7 +545,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
- EnumSet<XAttrSetFlag> flag) throws IOException {
+ EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setXAttr");
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index f65c1961b..c55d05797 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -59,19 +59,18 @@ public class SeaweedFileSystemStore {
}
public boolean createDirectory(final Path path, UserGroupInformation currentUser,
- final FsPermission permission, final FsPermission umask) {
+ final FsPermission permission, final FsPermission umask) {
LOG.debug("createDirectory path: {} permission: {} umask: {}",
- path,
- permission,
- umask);
+ path,
+ permission,
+ umask);
return filerClient.mkdirs(
- path.toUri().getPath(),
- permissionToMode(permission, true),
- currentUser.getUserName(),
- currentUser.getGroupNames()
- );
+ path.toUri().getPath(),
+ permissionToMode(permission, true),
+ currentUser.getUserName(),
+ currentUser.getGroupNames());
}
public FileStatus[] listEntries(final Path path) throws IOException {
@@ -84,7 +83,7 @@ public class SeaweedFileSystemStore {
}
if (!pathStatus.isDirectory()) {
- return new FileStatus[]{pathStatus};
+ return new FileStatus[] { pathStatus };
}
List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
@@ -116,9 +115,9 @@ public class SeaweedFileSystemStore {
public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
- path,
- String.valueOf(isDirectory),
- String.valueOf(recursive));
+ path,
+ String.valueOf(isDirectory),
+ String.valueOf(recursive));
if (path.isRoot()) {
return true;
@@ -146,7 +145,7 @@ public class SeaweedFileSystemStore {
String owner = attributes.getUserName();
String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
return new FileStatus(length, isDir, block_replication, blocksize,
- modification_time, access_time, permission, owner, group, null, path);
+ modification_time, access_time, permission, owner, group, null, path);
}
public FilerProto.Entry lookupEntry(Path path) {
@@ -162,27 +161,29 @@ public class SeaweedFileSystemStore {
if (source.isRoot()) {
return;
}
- LOG.info("rename source: {} destination:{}", source, destination);
+
FilerProto.Entry entry = lookupEntry(source);
if (entry == null) {
LOG.warn("rename non-existing source: {}", source);
return;
}
+
filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
}
public OutputStream createFile(final Path path,
- final boolean overwrite,
- FsPermission permission,
- int bufferSize,
- String replication) throws IOException {
+ final boolean overwrite,
+ FsPermission permission,
+ int bufferSize,
+ String replication) throws IOException {
permission = permission == null ? FsPermission.getFileDefault() : permission;
+
LOG.debug("createFile path: {} overwrite: {} permission: {}",
- path,
- overwrite,
- permission.toString());
+ path,
+ overwrite,
+ permission.toString());
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
long now = System.currentTimeMillis() / 1000L;
@@ -203,20 +204,21 @@ public class SeaweedFileSystemStore {
}
if (entry == null) {
entry = FilerProto.Entry.newBuilder()
- .setName(path.getName())
- .setIsDirectory(false)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setFileMode(permissionToMode(permission, false))
- .setCrtime(now)
- .setMtime(now)
- .setUserName(userGroupInformation.getUserName())
- .clearGroupName()
- .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
- );
+ .setName(path.getName())
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(permissionToMode(permission, false))
+ .setCrtime(now)
+ .setMtime(now)
+ .setUserName(userGroupInformation.getUserName())
+ .clearGroupName()
+ .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())));
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
}
- return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
+
+ return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize,
+ replication);
}
@@ -231,9 +233,9 @@ public class SeaweedFileSystemStore {
}
return new SeaweedHadoopInputStream(filerClient,
- statistics,
- path.toUri().getPath(),
- entry);
+ statistics,
+ path.toUri().getPath(),
+ entry);
}
public void setOwner(Path path, String owner, String group) {
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
index f26eae597..8ac5a5ab4 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -2,7 +2,6 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import seaweedfs.client.FilerClient;
@@ -11,12 +10,21 @@ import seaweedfs.client.SeaweedInputStream;
import java.io.EOFException;
import java.io.IOException;
-import java.nio.ByteBuffer;
-public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+/**
+ * SeaweedFS Hadoop InputStream.
+ *
+ * NOTE: Does NOT implement ByteBufferReadable to match RawLocalFileSystem
+ * behavior.
+ * This ensures BufferedFSInputStream is used, which properly handles position
+ * tracking
+ * for positioned reads (critical for Parquet and other formats).
+ */
+public class SeaweedHadoopInputStream extends FSInputStream {
private final SeaweedInputStream seaweedInputStream;
private final Statistics statistics;
+ private final String path;
public SeaweedHadoopInputStream(
final FilerClient filerClient,
@@ -25,6 +33,7 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
final FilerProto.Entry entry) throws IOException {
this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
this.statistics = statistics;
+ this.path = path;
}
@Override
@@ -37,20 +46,6 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
return seaweedInputStream.read(b, off, len);
}
- // implement ByteBufferReadable
- @Override
- public synchronized int read(ByteBuffer buf) throws IOException {
- int bytesRead = seaweedInputStream.read(buf);
-
- if (bytesRead > 0) {
- if (statistics != null) {
- statistics.incrementBytesRead(bytesRead);
- }
- }
-
- return bytesRead;
- }
-
/**
* Seek to given position in stream.
*
@@ -83,8 +78,10 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
}
/**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * Returns the length of the file that this stream refers to. Note that the
+ * length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been
+ * subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
@@ -104,8 +101,12 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
return seaweedInputStream.getPos();
}
+ public String getPath() {
+ return path;
+ }
+
/**
- * Seeks a different copy of the data. Returns true if
+ * Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*
* @throws IOException throws {@link IOException} if there is an error
@@ -139,7 +140,9 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe
}
/**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ * gets whether mark and reset are supported by
+ * {@code SeaweedHadoopInputStream}.
+ * Always returns false.
*
* @return always {@code false}
*/
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
index 1740312fe..a1a43820c 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -4,6 +4,8 @@ package seaweed.hdfs;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedOutputStream;
@@ -13,9 +15,12 @@ import java.util.Locale;
public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedHadoopOutputStream.class);
+
public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
+ final long position, final int bufferSize, final String replication) {
super(filerClient, path, entry, position, bufferSize, replication);
+
}
/**
@@ -26,6 +31,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy
*/
@Override
public void hsync() throws IOException {
+
if (supportFlush) {
flushInternal();
}
@@ -39,6 +45,7 @@ public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Sy
*/
@Override
public void hflush() throws IOException {
+
if (supportFlush) {
flushInternal();
}