aboutsummaryrefslogtreecommitdiff
path: root/other/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client')
-rw-r--r--other/java/client/pom.xml4
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java93
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java33
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java75
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java104
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java303
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java302
8 files changed, 732 insertions, 223 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 682582f7b..1989b9b05 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.seaweedfs</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>3.80</version>
+ <version>3.80.1-SNAPSHOT</version>
<name>SeaweedFS Java Client</name>
<description>A java client for SeaweedFS.</description>
@@ -33,7 +33,7 @@
<properties>
<protobuf.version>3.25.5</protobuf.version>
<!-- follow https://github.com/grpc/grpc-java -->
- <grpc.version>1.75.0</grpc.version>
+ <grpc.version>1.77.0</grpc.version>
<guava.version>32.0.0-jre</guava.version>
</properties>
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 44977d186..320a754ea 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -8,10 +8,13 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class FilerGrpcClient {
@@ -29,10 +32,12 @@ public class FilerGrpcClient {
public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
protected int randomClientId;
- private final ManagedChannel channel;
- private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
- private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
- private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
+
+ // Connection pool to handle concurrent requests
+ private static final int CHANNEL_POOL_SIZE = 4;
+ private final List<ManagedChannel> channelPool;
+ private final AtomicInteger channelIndex = new AtomicInteger(0);
+
private boolean cipher = false;
private String collection = "";
private String replication = "";
@@ -45,26 +50,18 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int port, int grpcPort, String cn, SslContext sslContext) {
- this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort)
- .usePlaintext()
- .maxInboundMessageSize(1024 * 1024 * 1024) :
- cn.isEmpty() ?
- NettyChannelBuilder.forAddress(host, grpcPort)
- .maxInboundMessageSize(1024 * 1024 * 1024)
- .negotiationType(NegotiationType.TLS)
- .sslContext(sslContext) :
- NettyChannelBuilder.forAddress(host, grpcPort)
- .maxInboundMessageSize(1024 * 1024 * 1024)
- .negotiationType(NegotiationType.TLS)
- .overrideAuthority(cn) //will not check hostname of the filer server
- .sslContext(sslContext)
- );
-
filerAddress = SeaweedUtil.joinHostPort(host, port);
- FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
- this.getBlockingStub().getFilerConfiguration(
+ // Create a pool of channels for better concurrency handling
+ channelPool = new ArrayList<>(CHANNEL_POOL_SIZE);
+
+ for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
+ channelPool.add(createChannelBuilder(host, grpcPort, sslContext, cn).build());
+ }
+
+ // Get filer configuration using first channel
+ FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc
+ .newBlockingStub(channelPool.get(0)).getFilerConfiguration(
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
cipher = filerConfigurationResponse.getCipher();
collection = filerConfigurationResponse.getCollection();
@@ -73,11 +70,39 @@ public class FilerGrpcClient {
}
- private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
- channel = channelBuilder.build();
- blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
- asyncStub = SeaweedFilerGrpc.newStub(channel);
- futureStub = SeaweedFilerGrpc.newFutureStub(channel);
+ /**
+ * Creates a NettyChannelBuilder with common gRPC configuration.
+ * Supports plaintext and TLS modes with optional authority override.
+ */
+ private NettyChannelBuilder createChannelBuilder(String host, int grpcPort, SslContext sslContext, String cn) {
+ NettyChannelBuilder builder = NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
+ .maxInboundMetadataSize(1024 * 1024)
+ .flowControlWindow(16 * 1024 * 1024)
+ .initialFlowControlWindow(16 * 1024 * 1024)
+ .maxHeaderListSize(16 * 1024 * 1024)
+ .keepAliveTime(30, TimeUnit.SECONDS)
+ .keepAliveTimeout(10, TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true)
+ .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024)
+ .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024);
+
+ if (sslContext == null) {
+ builder.usePlaintext();
+ } else {
+ builder.negotiationType(NegotiationType.TLS).sslContext(sslContext);
+ if (!cn.isEmpty()) {
+ builder.overrideAuthority(cn);
+ }
+ }
+ return builder;
+ }
+
+ // Get a channel from the pool using round-robin
+ private ManagedChannel getChannel() {
+ int raw = channelIndex.getAndIncrement();
+ int index = Math.floorMod(raw, CHANNEL_POOL_SIZE);
+ return channelPool.get(index);
}
public boolean isCipher() {
@@ -93,19 +118,25 @@ public class FilerGrpcClient {
}
public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ for (ManagedChannel channel : channelPool) {
+ channel.shutdown();
+ }
+ for (ManagedChannel channel : channelPool) {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ }
}
public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() {
- return blockingStub;
+ // Return a new stub using a channel from the pool (round-robin)
+ return SeaweedFilerGrpc.newBlockingStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() {
- return asyncStub;
+ return SeaweedFilerGrpc.newStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
- return futureStub;
+ return SeaweedFilerGrpc.newFutureStub(getChannel());
}
public void setAccessVolumeServerDirectly() {
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
index 64754321b..48a508db0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -23,7 +23,7 @@ public class SeaweedInputStream extends InputStream {
private final long contentLength;
private FilerProto.Entry entry;
- private long position = 0; // cursor of the file
+ private long position = 0; // cursor of the file
private boolean closed = false;
@@ -44,7 +44,6 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
-
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -64,7 +63,6 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
-
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -99,7 +97,8 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (len > (b.length - off)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ throw new IllegalArgumentException(
+ "requested read length is more than will fit after requested offset in buffer");
}
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
@@ -114,21 +113,30 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
+ return -1; // Hadoop prefers -1 to EOFException
}
long bytesRead = 0;
int len = buf.remaining();
- if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) {
- entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf);
+ if (this.position < Integer.MAX_VALUE && (this.position + len) <= entry.getContent().size()) {
+ entry.getContent().substring((int) this.position, (int) (this.position + len)).copyTo(buf);
+ bytesRead = len; // FIX: Update bytesRead after inline copy
} else {
- bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
+ // Use the known contentLength instead of recomputing from the entry to avoid
+ // races
+ bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf,
+ this.contentLength);
}
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
+ // Clamp premature EOFs: do not return -1 unless position >= contentLength
+ if (bytesRead < 0 && position < contentLength) {
+ bytesRead = 0;
+ }
+
if (bytesRead > 0) {
this.position += bytesRead;
}
@@ -188,12 +196,15 @@ public class SeaweedInputStream extends InputStream {
}
final long remaining = this.contentLength - this.position;
return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
+ ? (int) remaining
+ : Integer.MAX_VALUE;
}
/**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * Returns the length of the file that this stream refers to. Note that the
+ * length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been
+ * subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index 68c281992..ea4c99805 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -23,15 +23,13 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
- private final boolean shouldSaveMetadata = false;
private FilerProto.Entry.Builder entry;
- private long position;
+ private long position; // Flushed bytes (committed to service)
private boolean closed;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private ByteBuffer buffer;
- private long outputIndex;
private String replication = "";
private String collection = "";
@@ -44,7 +42,8 @@ public class SeaweedOutputStream extends OutputStream {
}
public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
+ final long position, final int bufferSize, final String replication) {
+
this.filerClient = filerClient;
this.replication = replication;
this.path = path;
@@ -58,8 +57,7 @@ public class SeaweedOutputStream extends OutputStream {
this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
120L,
TimeUnit.SECONDS,
@@ -77,8 +75,7 @@ public class SeaweedOutputStream extends OutputStream {
.setFileMode(0755)
.setCrtime(now)
.setMtime(now)
- .clearGroupName()
- );
+ .clearGroupName());
}
}
@@ -86,14 +83,35 @@ public class SeaweedOutputStream extends OutputStream {
public void setReplication(String replication) {
this.replication = replication;
}
+
public void setCollection(String collection) {
this.collection = collection;
}
+ /**
+ * Get the current position in the output stream.
+ * This returns the total position including both flushed and buffered data.
+ *
+ * @return current position (flushed + buffered bytes)
+ */
+ public synchronized long getPos() throws IOException {
+ // Guard against NPE if called after close()
+ if (buffer == null) {
+ return position;
+ }
+
+ // Return current position (flushed + buffered)
+ return position + buffer.position();
+ }
+
public static String getParentDirectory(String path) {
int protoIndex = path.indexOf("://");
if (protoIndex >= 0) {
- int pathStart = path.indexOf("/", protoIndex+3);
+ int pathStart = path.indexOf("/", protoIndex + 3);
+ if (pathStart < 0) {
+ // No path segment; treat as root (e.g., "seaweedfs://host")
+ return "/";
+ }
path = path.substring(pathStart);
}
if (path.equals("/")) {
@@ -116,6 +134,13 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
+
+ // Set the file size in attributes based on our position
+ // This ensures Parquet footer metadata matches what we actually wrote
+ FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder();
+ attrBuilder.setFileSize(offset);
+ entry.setAttributes(attrBuilder);
+
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) {
throw new IOException(ex);
@@ -125,7 +150,7 @@ public class SeaweedOutputStream extends OutputStream {
@Override
public void write(final int byteVal) throws IOException {
- write(new byte[]{(byte) (byteVal & 0xFF)});
+ write(new byte[] { (byte) (byteVal & 0xFF) });
}
@Override
@@ -141,8 +166,6 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException();
}
- // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
-
int currentOffset = off;
int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
@@ -154,9 +177,11 @@ public class SeaweedOutputStream extends OutputStream {
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," +
+ // ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
currentOffset += writableBytes;
+
writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
writableBytes = bufferSize - buffer.position();
@@ -191,7 +216,6 @@ public class SeaweedOutputStream extends OutputStream {
return;
}
- LOG.debug("close path: {}", path);
try {
flushInternal();
threadExecutor.shutdown();
@@ -209,28 +233,35 @@ public class SeaweedOutputStream extends OutputStream {
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (buffer.position() == 0) {
+ int bufferPos = buffer.position();
+
+ if (bufferPos == 0) {
return;
}
- position += submitWriteBufferToService(buffer, position);
+ int written = submitWriteBufferToService(buffer, position);
+ position += written;
buffer = ByteBufferPool.request(bufferSize);
}
- private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
+ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition)
+ throws IOException {
- ((Buffer)bufferToWrite).flip();
+ ((Buffer) bufferToWrite).flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {
- // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
- // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ // System.out.println(path + " is going to save [" + (writePosition) + "," +
+ // ((writePosition) + bytesLength) + ")");
+ SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(),
+ bufferToWrite.position(), bufferToWrite.limit(), path);
+ // System.out.println(path + " saved [" + (writePosition) + "," +
+ // ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite);
return null;
});
@@ -318,12 +349,10 @@ public class SeaweedOutputStream extends OutputStream {
private static class WriteOperation {
private final Future<Void> task;
- private final long startOffset;
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
this.task = task;
- this.startOffset = startOffset;
this.length = length;
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index cac85d186..3fd184671 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -24,9 +24,10 @@ public class SeaweedRead {
// returns bytesRead
public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals,
- final long position, final ByteBuffer buf, final long fileSize) throws IOException {
+ final long position, final ByteBuffer buf, final long fileSize) throws IOException {
- List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
+ int originalRemaining = buf.remaining();
+ List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, originalRemaining);
Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
@@ -51,7 +52,7 @@ public class SeaweedRead {
}
}
- //TODO parallel this
+ // TODO parallel this
long readCount = 0;
long startOffset = position;
for (ChunkView chunkView : chunkViews) {
@@ -59,7 +60,7 @@ public class SeaweedRead {
if (startOffset < chunkView.logicOffset) {
long gap = chunkView.logicOffset - startOffset;
LOG.debug("zero [{},{})", startOffset, startOffset + gap);
- buf.position(buf.position()+ (int)gap);
+ buf.position(buf.position() + (int) gap);
readCount += gap;
startOffset += gap;
}
@@ -81,12 +82,17 @@ public class SeaweedRead {
}
- long limit = Math.min(buf.limit(), fileSize);
+ // Fix: Calculate the correct limit based on the read position and requested
+ // size,
+ // not the buffer's absolute limit. This fixes the 78-byte EOF error when
+ // seeking
+ // near the end of the file.
+ long limit = Math.min(position + originalRemaining, fileSize);
if (startOffset < limit) {
long gap = limit - startOffset;
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
- buf.position(buf.position()+ (int)gap);
+ buf.position(buf.position() + (int) gap);
readCount += gap;
startOffset += gap;
}
@@ -94,7 +100,8 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView,
+ FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@@ -105,13 +112,15 @@ public class SeaweedRead {
int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset);
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
- chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset,
+ chunkView.logicOffset + chunkView.size, startOffset);
buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
return len;
}
- public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView,
+ FilerProto.Locations locations) throws IOException {
byte[] data = null;
IOException lastException = null;
@@ -214,8 +223,7 @@ public class SeaweedRead {
chunkStart,
isFullChunk,
chunk.cipherKey,
- chunk.isCompressed
- ));
+ chunk.isCompressed));
}
}
return views;
@@ -239,7 +247,10 @@ public class SeaweedRead {
}
public static long fileSize(FilerProto.Entry entry) {
- return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ long chunksSize = totalSize(entry.getChunksList());
+ long attrSize = entry.getAttributes().getFileSize();
+ long finalSize = Math.max(chunksSize, attrSize);
+ return finalSize;
}
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
@@ -263,7 +274,8 @@ public class SeaweedRead {
public final byte[] cipherKey;
public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset,
+ boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
@@ -297,7 +309,8 @@ public class SeaweedRead {
public final byte[] cipherKey;
public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey,
+ boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 88c7cefbe..0fadd53cc 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -2,6 +2,7 @@ package seaweedfs.client;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
+import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
@@ -25,13 +26,13 @@ public class SeaweedWrite {
private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
- final String replication,
- String collection,
- final FilerClient filerClient,
- final long offset,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength,
- final String path) throws IOException {
+ final String replication,
+ String collection,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength,
+ final String path) throws IOException {
IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
@@ -60,21 +61,50 @@ public class SeaweedWrite {
}
public static FilerProto.FileChunk.Builder writeChunk(final String replication,
- final String collection,
- final FilerClient filerClient,
- final long offset,
- final byte[] bytes,
- final long bytesOffset,
- final long bytesLength,
- final String path) throws IOException {
- FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
- FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
- .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
- .setDataCenter("")
- .setTtlSec(0)
- .setPath(path)
- .build());
+ final String collection,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength,
+ final String path) throws IOException {
+
+ // Retry assignVolume call for transient network/server errors
+ FilerProto.AssignVolumeResponse response = null;
+ IOException lastException = null;
+ int maxRetries = 3;
+
+ for (int attempt = 0; attempt < maxRetries; attempt++) {
+ try {
+ response = filerClient.getBlockingStub().assignVolume(
+ FilerProto.AssignVolumeRequest.newBuilder()
+ .setCollection(
+ Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
+ .setReplication(
+ Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
+ .setDataCenter("")
+ .setTtlSec(0)
+ .setPath(path)
+ .build());
+ break; // Success, exit retry loop
+ } catch (io.grpc.StatusRuntimeException e) {
+ lastException = new IOException(
+ "assignVolume failed (attempt " + (attempt + 1) + "/" + maxRetries + "): " + e.getMessage(), e);
+ if (attempt < maxRetries - 1) {
+ try {
+ Thread.sleep(100 * (attempt + 1)); // Exponential backoff: 100ms, 200ms, 300ms
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted during retry", ie);
+ }
+ }
+ }
+ }
+
+ if (response == null) {
+ throw lastException != null ? lastException
+ : new IOException("assignVolume failed after " + maxRetries + " attempts");
+ }
if (!Strings.isNullOrEmpty(response.getError())) {
throw new IOException(response.getError());
@@ -83,7 +113,8 @@ public class SeaweedWrite {
String fileId = response.getFileId();
String auth = response.getAuth();
- String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl());
+ String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(),
+ response.getLocation().getPublicUrl());
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null;
@@ -94,8 +125,6 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
-
return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
@@ -106,27 +135,28 @@ public class SeaweedWrite {
}
public static void writeMeta(final FilerClient filerClient,
- final String parentDirectory,
- final FilerProto.Entry.Builder entry) throws IOException {
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
synchronized (entry) {
- List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(),
+ parentDirectory);
+
entry.clearChunks();
entry.addAllChunks(chunks);
filerClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
.setEntry(entry)
- .build()
- );
+ .build());
}
}
private static String multipartUpload(String targetUrl,
- String auth,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength,
- byte[] cipherKey) throws IOException {
+ String auth,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength,
+ byte[] cipherKey) throws IOException {
MessageDigest md = null;
try {
md = MessageDigest.getInstance("MD5");
@@ -162,8 +192,10 @@ public class SeaweedWrite {
try {
if (response.getStatusLine().getStatusCode() / 100 != 2) {
- if (response.getEntity().getContentType() != null && response.getEntity().getContentType().getValue().equals("application/json")) {
- throw new IOException(EntityUtils.toString(response.getEntity(), "UTF-8"));
+ HttpEntity entity = response.getEntity();
+ if (entity != null && entity.getContentType() != null
+ && entity.getContentType().getValue().equals("application/json")) {
+ throw new IOException(EntityUtils.toString(entity, "UTF-8"));
} else {
throw new IOException(response.getStatusLine().getReasonPhrase());
}
diff --git a/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java
new file mode 100644
index 000000000..d49e17e72
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java
@@ -0,0 +1,303 @@
+package seaweedfs.client;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test to reproduce the Parquet EOF issue.
+ *
+ * The issue: When Parquet writes column chunks, it calls getPos() to record
+ * offsets.
+ * If getPos() returns a position that doesn't include buffered (unflushed)
+ * data,
+ * the footer metadata will have incorrect offsets.
+ *
+ * This test simulates Parquet's behavior:
+ * 1. Write some data (column chunk 1)
+ * 2. Call getPos() - Parquet records this as the END of chunk 1
+ * 3. Write more data (column chunk 2)
+ * 4. Call getPos() - Parquet records this as the END of chunk 2
+ * 5. Close the file
+ * 6. Verify that the recorded positions match the actual file content
+ *
+ * Prerequisites:
+ * - SeaweedFS master, volume server, and filer must be running
+ * - Default ports: filer HTTP 8888, filer gRPC 18888
+ *
+ * To run:
+ * export SEAWEEDFS_TEST_ENABLED=true
+ * cd other/java/client
+ * mvn test -Dtest=GetPosBufferTest
+ */
+public class GetPosBufferTest {
+
+ private FilerClient filerClient;
+ private static final String TEST_ROOT = "/test-getpos-buffer";
+ private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+
+ @Before
+ public void setUp() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+
+ String filerHost = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
+ String filerGrpcPort = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888");
+
+ filerClient = new FilerClient(filerHost, Integer.parseInt(filerGrpcPort));
+
+ // Clean up any existing test directory
+ if (filerClient.exists(TEST_ROOT)) {
+ filerClient.rm(TEST_ROOT, true, true);
+ }
+
+ // Create test root directory
+ filerClient.mkdirs(TEST_ROOT, 0755);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ if (filerClient != null) {
+ filerClient.rm(TEST_ROOT, true, true);
+ filerClient.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetPosWithBufferedData() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with buffered data ===");
+
+ String testPath = TEST_ROOT + "/getpos-test.bin";
+
+ // Simulate what Parquet does when writing column chunks
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write "column chunk 1" - 100 bytes
+ byte[] chunk1 = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ chunk1[i] = (byte) i;
+ }
+ outputStream.write(chunk1);
+
+ // Parquet calls getPos() here to record end of chunk 1
+ long posAfterChunk1 = outputStream.getPos();
+ System.out.println("Position after chunk 1 (100 bytes): " + posAfterChunk1);
+ assertEquals("getPos() should return 100 after writing 100 bytes", 100, posAfterChunk1);
+
+ // Write "column chunk 2" - 200 bytes
+ byte[] chunk2 = new byte[200];
+ for (int i = 0; i < 200; i++) {
+ chunk2[i] = (byte) (i + 100);
+ }
+ outputStream.write(chunk2);
+
+ // Parquet calls getPos() here to record end of chunk 2
+ long posAfterChunk2 = outputStream.getPos();
+ System.out.println("Position after chunk 2 (200 more bytes): " + posAfterChunk2);
+ assertEquals("getPos() should return 300 after writing 300 bytes total", 300, posAfterChunk2);
+
+ // Write "column chunk 3" - small chunk of 78 bytes (the problematic size!)
+ byte[] chunk3 = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ chunk3[i] = (byte) (i + 50);
+ }
+ outputStream.write(chunk3);
+
+ // Parquet calls getPos() here to record end of chunk 3
+ long posAfterChunk3 = outputStream.getPos();
+ System.out.println("Position after chunk 3 (78 more bytes): " + posAfterChunk3);
+ assertEquals("getPos() should return 378 after writing 378 bytes total", 378, posAfterChunk3);
+
+ // Close to flush everything
+ outputStream.close();
+ System.out.println("File closed successfully");
+
+ // Now read the file and verify its actual size matches what getPos() reported
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
+ long actualFileSize = SeaweedRead.fileSize(entry);
+ System.out.println("Actual file size on disk: " + actualFileSize);
+
+ assertEquals("File size should match the last getPos() value", 378, actualFileSize);
+
+ // Now read the file and verify we can read all the data
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ byte[] readBuffer = new byte[500]; // Larger buffer to read everything
+ int totalRead = 0;
+ int bytesRead;
+ while ((bytesRead = inputStream.read(readBuffer, totalRead, readBuffer.length - totalRead)) > 0) {
+ totalRead += bytesRead;
+ }
+ inputStream.close();
+
+ System.out.println("Total bytes read: " + totalRead);
+ assertEquals("Should read exactly 378 bytes", 378, totalRead);
+
+ // Verify the data is correct
+ for (int i = 0; i < 100; i++) {
+ assertEquals("Chunk 1 data mismatch at byte " + i, (byte) i, readBuffer[i]);
+ }
+ for (int i = 0; i < 200; i++) {
+ assertEquals("Chunk 2 data mismatch at byte " + (100 + i), (byte) (i + 100), readBuffer[100 + i]);
+ }
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Chunk 3 data mismatch at byte " + (300 + i), (byte) (i + 50), readBuffer[300 + i]);
+ }
+
+ System.out.println("SUCCESS: All data verified correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithSmallWrites() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with many small writes (Parquet pattern) ===");
+
+ String testPath = TEST_ROOT + "/small-writes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Parquet writes column data in small chunks and frequently calls getPos()
+ String[] columnData = { "Alice", "Bob", "Charlie", "David" };
+ long[] recordedPositions = new long[columnData.length];
+
+ for (int i = 0; i < columnData.length; i++) {
+ byte[] data = columnData[i].getBytes(StandardCharsets.UTF_8);
+ outputStream.write(data);
+
+ // Parquet calls getPos() after each value to track offsets
+ recordedPositions[i] = outputStream.getPos();
+ System.out.println("After writing '" + columnData[i] + "': pos=" + recordedPositions[i]);
+ }
+
+ long finalPos = outputStream.getPos();
+ System.out.println("Final position before close: " + finalPos);
+
+ outputStream.close();
+
+ // Verify file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size should match final getPos()", finalPos, actualFileSize);
+
+ // Verify we can read using the recorded positions
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ long currentPos = 0;
+ for (int i = 0; i < columnData.length; i++) {
+ long nextPos = recordedPositions[i];
+ int length = (int) (nextPos - currentPos);
+
+ byte[] buffer = new byte[length];
+ int bytesRead = inputStream.read(buffer, 0, length);
+
+ assertEquals("Should read " + length + " bytes for '" + columnData[i] + "'", length, bytesRead);
+
+ String readData = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
+ System.out.println("Read at offset " + currentPos + ": '" + readData + "'");
+ assertEquals("Data mismatch", columnData[i], readData);
+
+ currentPos = nextPos;
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: Small writes with getPos() tracking work correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithExactly78BytesBuffered() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with EXACTLY 78 bytes buffered (the bug size!) ===");
+
+ String testPath = TEST_ROOT + "/78-bytes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write some initial data
+ byte[] initial = new byte[1000];
+ for (int i = 0; i < 1000; i++) {
+ initial[i] = (byte) i;
+ }
+ outputStream.write(initial);
+ outputStream.flush(); // Ensure this is flushed
+
+ long posAfterFlush = outputStream.getPos();
+ System.out.println("Position after 1000 bytes + flush: " + posAfterFlush);
+ assertEquals("Should be at position 1000 after flush", 1000, posAfterFlush);
+
+ // Now write EXACTLY 78 bytes (the problematic buffer size in our bug)
+ byte[] problematicChunk = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ problematicChunk[i] = (byte) (i + 50);
+ }
+ outputStream.write(problematicChunk);
+
+ // DO NOT FLUSH - this is the bug scenario!
+ // Parquet calls getPos() here while the 78 bytes are still buffered
+ long posWithBufferedData = outputStream.getPos();
+ System.out.println("Position with 78 bytes BUFFERED (not flushed): " + posWithBufferedData);
+
+ // This MUST return 1078, not 1000!
+ assertEquals("getPos() MUST include buffered data", 1078, posWithBufferedData);
+
+ // Now close (which will flush)
+ outputStream.close();
+
+ // Verify actual file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size must be 1078", 1078, actualFileSize);
+
+ // Try to read at position 1000 for 78 bytes (what Parquet would try)
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1000);
+
+ byte[] readBuffer = new byte[78];
+ int bytesRead = inputStream.read(readBuffer, 0, 78);
+
+ System.out.println("Bytes read at position 1000: " + bytesRead);
+ assertEquals("Should successfully read 78 bytes at position 1000", 78, bytesRead);
+
+ // Verify the data matches
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Data mismatch at byte " + i, problematicChunk[i], readBuffer[i]);
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: getPos() correctly includes buffered data!\n");
+ }
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
index f384e059f..3cfb2ce9e 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
@@ -28,22 +28,21 @@ public class SeaweedStreamIntegrationTest {
private FilerClient filerClient;
private static final String TEST_ROOT = "/test-stream-integration";
- private static final boolean TESTS_ENABLED =
- "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+ private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
@Before
public void setUp() throws Exception {
if (!TESTS_ENABLED) {
return;
}
-
+
filerClient = new FilerClient("localhost", 18888);
-
+
// Clean up any existing test directory
if (filerClient.exists(TEST_ROOT)) {
filerClient.rm(TEST_ROOT, true, true);
}
-
+
// Create test root directory
filerClient.mkdirs(TEST_ROOT, 0755);
}
@@ -53,7 +52,7 @@ public class SeaweedStreamIntegrationTest {
if (!TESTS_ENABLED || filerClient == null) {
return;
}
-
+
try {
// Clean up test directory
if (filerClient.exists(TEST_ROOT)) {
@@ -70,30 +69,29 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/small.txt";
String testContent = "Hello, SeaweedFS!";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[testContent.length()];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read all bytes", testContent.length(), bytesRead);
assertEquals("Content should match", testContent, new String(buffer, StandardCharsets.UTF_8));
}
@@ -104,43 +102,42 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/large.bin";
int fileSize = 10 * 1024 * 1024; // 10 MB
-
+
// Generate random data
byte[] originalData = new byte[fileSize];
new Random(42).nextBytes(originalData); // Use seed for reproducibility
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(originalData);
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
// Read file in chunks to handle large files properly
byte[] readData = new byte[fileSize];
int totalRead = 0;
int bytesRead;
byte[] buffer = new byte[8192]; // Read in 8KB chunks
-
+
while ((bytesRead = inputStream.read(buffer)) > 0) {
System.arraycopy(buffer, 0, readData, totalRead, bytesRead);
totalRead += bytesRead;
}
inputStream.close();
-
+
assertEquals("Should read all bytes", fileSize, totalRead);
assertArrayEquals("Content should match", originalData, readData);
}
@@ -151,31 +148,30 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/chunked.txt";
- String[] chunks = {"First chunk. ", "Second chunk. ", "Third chunk."};
-
+ String[] chunks = { "First chunk. ", "Second chunk. ", "Third chunk." };
+
// Write file in chunks
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
for (String chunk : chunks) {
outputStream.write(chunk.getBytes(StandardCharsets.UTF_8));
}
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
String expected = String.join("", chunks);
String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
-
+
assertEquals("Content should match", expected, actual);
}
@@ -185,31 +181,30 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/offset.txt";
String testContent = "0123456789ABCDEFGHIJ";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read with offset
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
inputStream.seek(10); // Skip first 10 bytes
-
+
byte[] buffer = new byte[10];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 10 bytes", 10, bytesRead);
- assertEquals("Should read from offset", "ABCDEFGHIJ",
- new String(buffer, StandardCharsets.UTF_8));
+ assertEquals("Should read from offset", "ABCDEFGHIJ",
+ new String(buffer, StandardCharsets.UTF_8));
}
@Test
@@ -218,32 +213,31 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/partial.txt";
String testContent = "The quick brown fox jumps over the lazy dog";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read partial
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
// Read only "quick brown"
inputStream.seek(4);
byte[] buffer = new byte[11];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 11 bytes", 11, bytesRead);
- assertEquals("Should read partial content", "quick brown",
- new String(buffer, StandardCharsets.UTF_8));
+ assertEquals("Should read partial content", "quick brown",
+ new String(buffer, StandardCharsets.UTF_8));
}
@Test
@@ -252,28 +246,27 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/empty.txt";
-
+
// Write empty file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read empty file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[100];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 0 bytes from empty file", -1, bytesRead);
}
@@ -283,32 +276,31 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/overwrite.txt";
String originalContent = "Original content";
String newContent = "New content that overwrites the original";
-
+
// Write original file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(originalContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Overwrite file
outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(newContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
assertEquals("Should have new content", newContent, actual);
}
@@ -319,23 +311,22 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/multireads.txt";
String testContent = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read in multiple small chunks
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
StringBuilder result = new StringBuilder();
byte[] buffer = new byte[5];
int bytesRead;
@@ -343,7 +334,7 @@ public class SeaweedStreamIntegrationTest {
result.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
}
inputStream.close();
-
+
assertEquals("Should read entire content", testContent, result.toString());
}
@@ -353,29 +344,28 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/binary.bin";
byte[] binaryData = new byte[256];
for (int i = 0; i < 256; i++) {
binaryData[i] = (byte) i;
}
-
+
// Write binary file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(binaryData);
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] readData = new byte[256];
int bytesRead = inputStream.read(readData);
inputStream.close();
-
+
assertEquals("Should read all bytes", 256, bytesRead);
assertArrayEquals("Binary data should match", binaryData, readData);
}
@@ -386,32 +376,132 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/flush.txt";
String testContent = "Content to flush";
-
+
// Write file with flush
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.flush(); // Explicitly flush
outputStream.close();
-
+
// Verify file was written
assertTrue("File should exist after flush", filerClient.exists(testPath));
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[testContent.length()];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
- assertEquals("Content should match", testContent,
- new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
+
+ assertEquals("Content should match", testContent,
+ new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
}
-}
+ /**
+ * Tests range reads similar to how Parquet reads column chunks.
+ * This simulates:
+ * 1. Seeking to specific offsets
+ * 2. Reading specific byte ranges
+ * 3. Verifying each read() call returns the correct number of bytes
+ *
+ * This test specifically addresses the bug where read() was returning 0
+ * for inline content or -1 prematurely for chunked reads.
+ */
+ @Test
+ public void testRangeReads() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ String testPath = TEST_ROOT + "/rangereads.dat";
+
+ // Create a 1275-byte file (similar to the Parquet file size that was failing)
+ byte[] testData = new byte[1275];
+ Random random = new Random(42); // Fixed seed for reproducibility
+ random.nextBytes(testData);
+
+ // Write file
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+ outputStream.write(testData);
+ outputStream.close();
+
+ // Read file entry
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
+ // Test 1: Read last 8 bytes (like reading Parquet footer length)
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1267);
+ byte[] buffer = new byte[8];
+ int bytesRead = inputStream.read(buffer, 0, 8);
+ assertEquals("Should read 8 bytes at offset 1267", 8, bytesRead);
+ assertArrayEquals("Content at offset 1267 should match",
+ Arrays.copyOfRange(testData, 1267, 1275), buffer);
+ inputStream.close();
+
+ // Test 2: Read large chunk in middle (like reading column data)
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(383);
+ buffer = new byte[884]; // Read bytes 383-1267
+ bytesRead = inputStream.read(buffer, 0, 884);
+ assertEquals("Should read 884 bytes at offset 383", 884, bytesRead);
+ assertArrayEquals("Content at offset 383 should match",
+ Arrays.copyOfRange(testData, 383, 1267), buffer);
+ inputStream.close();
+
+ // Test 3: Read from beginning (like reading Parquet magic bytes)
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ buffer = new byte[4];
+ bytesRead = inputStream.read(buffer, 0, 4);
+ assertEquals("Should read 4 bytes at offset 0", 4, bytesRead);
+ assertArrayEquals("Content at offset 0 should match",
+ Arrays.copyOfRange(testData, 0, 4), buffer);
+ inputStream.close();
+
+ // Test 4: Multiple sequential reads without seeking (like
+ // H2SeekableInputStream.readFully)
+ // This is the critical test case that was failing!
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1197); // Position where EOF was being returned prematurely
+
+ byte[] fullBuffer = new byte[78]; // Try to read the "missing" 78 bytes
+ int totalRead = 0;
+ int offset = 0;
+ int remaining = 78;
+
+ // Simulate Parquet's H2SeekableInputStream.readFully() loop
+ while (remaining > 0) {
+ int read = inputStream.read(fullBuffer, offset, remaining);
+ if (read == -1) {
+ fail(String.format(
+ "Got EOF after reading %d bytes, but expected to read %d more bytes (total requested: 78)",
+ totalRead, remaining));
+ }
+ assertTrue("Each read() should return positive bytes", read > 0);
+ totalRead += read;
+ offset += read;
+ remaining -= read;
+ }
+
+ assertEquals("Should read all 78 bytes in readFully loop", 78, totalRead);
+ assertArrayEquals("Content at offset 1197 should match",
+ Arrays.copyOfRange(testData, 1197, 1275), fullBuffer);
+ inputStream.close();
+
+ // Test 5: Read entire file in one go
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ byte[] allData = new byte[1275];
+ bytesRead = inputStream.read(allData, 0, 1275);
+ assertEquals("Should read entire 1275 bytes", 1275, bytesRead);
+ assertArrayEquals("Entire content should match", testData, allData);
+ inputStream.close();
+ }
+}