diff options
Diffstat (limited to 'other/java/client')
8 files changed, 732 insertions, 223 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 682582f7b..1989b9b05 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.seaweedfs</groupId> <artifactId>seaweedfs-client</artifactId> - <version>3.80</version> + <version>3.80.1-SNAPSHOT</version> <name>SeaweedFS Java Client</name> <description>A java client for SeaweedFS.</description> @@ -33,7 +33,7 @@ <properties> <protobuf.version>3.25.5</protobuf.version> <!-- follow https://github.com/grpc/grpc-java --> - <grpc.version>1.75.0</grpc.version> + <grpc.version>1.77.0</grpc.version> <guava.version>32.0.0-jre</guava.version> </properties> diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 44977d186..320a754ea 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -8,10 +8,13 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class FilerGrpcClient { @@ -29,10 +32,12 @@ public class FilerGrpcClient { public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); protected int randomClientId; - private final ManagedChannel channel; - private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; - private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; - private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + + // Connection pool to handle concurrent requests + private static final int CHANNEL_POOL_SIZE = 4; + private final List<ManagedChannel> channelPool; + private final AtomicInteger channelIndex = new AtomicInteger(0); + private boolean cipher = false; private String collection = ""; private String replication = ""; @@ -45,26 +50,18 @@ public class FilerGrpcClient { public FilerGrpcClient(String host, int port, int grpcPort, String cn, SslContext sslContext) { - this(sslContext == null ? - ManagedChannelBuilder.forAddress(host, grpcPort) - .usePlaintext() - .maxInboundMessageSize(1024 * 1024 * 1024) : - cn.isEmpty() ? - NettyChannelBuilder.forAddress(host, grpcPort) - .maxInboundMessageSize(1024 * 1024 * 1024) - .negotiationType(NegotiationType.TLS) - .sslContext(sslContext) : - NettyChannelBuilder.forAddress(host, grpcPort) - .maxInboundMessageSize(1024 * 1024 * 1024) - .negotiationType(NegotiationType.TLS) - .overrideAuthority(cn) //will not check hostname of the filer server - .sslContext(sslContext) - ); - filerAddress = SeaweedUtil.joinHostPort(host, port); - FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = - this.getBlockingStub().getFilerConfiguration( + // Create a pool of channels for better concurrency handling + channelPool = new ArrayList<>(CHANNEL_POOL_SIZE); + + for (int i = 0; i < CHANNEL_POOL_SIZE; i++) { + channelPool.add(createChannelBuilder(host, grpcPort, sslContext, cn).build()); + } + + // Get filer configuration using first channel + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc + .newBlockingStub(channelPool.get(0)).getFilerConfiguration( FilerProto.GetFilerConfigurationRequest.newBuilder().build()); cipher = filerConfigurationResponse.getCipher(); collection = filerConfigurationResponse.getCollection(); @@ -73,11 +70,39 @@ public class FilerGrpcClient { } - private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { - channel = channelBuilder.build(); - blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); - asyncStub = SeaweedFilerGrpc.newStub(channel); - futureStub = SeaweedFilerGrpc.newFutureStub(channel); + /** + * Creates a NettyChannelBuilder with common gRPC configuration. + * Supports plaintext and TLS modes with optional authority override. + */ + private NettyChannelBuilder createChannelBuilder(String host, int grpcPort, SslContext sslContext, String cn) { + NettyChannelBuilder builder = NettyChannelBuilder.forAddress(host, grpcPort) + .maxInboundMessageSize(1024 * 1024 * 1024) + .maxInboundMetadataSize(1024 * 1024) + .flowControlWindow(16 * 1024 * 1024) + .initialFlowControlWindow(16 * 1024 * 1024) + .maxHeaderListSize(16 * 1024 * 1024) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024); + + if (sslContext == null) { + builder.usePlaintext(); + } else { + builder.negotiationType(NegotiationType.TLS).sslContext(sslContext); + if (!cn.isEmpty()) { + builder.overrideAuthority(cn); + } + } + return builder; + } + + // Get a channel from the pool using round-robin + private ManagedChannel getChannel() { + int raw = channelIndex.getAndIncrement(); + int index = Math.floorMod(raw, CHANNEL_POOL_SIZE); + return channelPool.get(index); } public boolean isCipher() { @@ -93,19 +118,25 @@ public class FilerGrpcClient { } public void shutdown() throws InterruptedException { - channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + for (ManagedChannel channel : channelPool) { + channel.shutdown(); + } + for (ManagedChannel channel : channelPool) { + channel.awaitTermination(5, TimeUnit.SECONDS); + } } public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() { - return blockingStub; + // Return a new stub using a channel from the pool (round-robin) + return SeaweedFilerGrpc.newBlockingStub(getChannel()); } public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() { - return asyncStub; + return SeaweedFilerGrpc.newStub(getChannel()); } public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { - return futureStub; + return SeaweedFilerGrpc.newFutureStub(getChannel()); } public void setAccessVolumeServerDirectly() { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 64754321b..48a508db0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -23,7 +23,7 @@ public class SeaweedInputStream extends InputStream { private final long contentLength; private FilerProto.Entry entry; - private long position = 0; // cursor of the file + private long position = 0; // cursor of the file private boolean closed = false; @@ -44,7 +44,6 @@ public class SeaweedInputStream extends InputStream { } this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -64,7 +63,6 @@ public class SeaweedInputStream extends InputStream { } this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -99,7 +97,8 @@ public class SeaweedInputStream extends InputStream { throw new IllegalArgumentException("requested read length is less than zero"); } if (len > (b.length - off)) { - throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); + throw new IllegalArgumentException( + "requested read length is more than will fit after requested offset in buffer"); } ByteBuffer buf = ByteBuffer.wrap(b, off, len); @@ -114,21 +113,30 @@ public class SeaweedInputStream extends InputStream { throw new IllegalArgumentException("attempting to read from negative offset"); } if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException + return -1; // Hadoop prefers -1 to EOFException } long bytesRead = 0; int len = buf.remaining(); - if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) { - entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf); + if (this.position < Integer.MAX_VALUE && (this.position + len) <= entry.getContent().size()) { + entry.getContent().substring((int) this.position, (int) (this.position + len)).copyTo(buf); + bytesRead = len; // FIX: Update bytesRead after inline copy } else { - bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + // Use the known contentLength instead of recomputing from the entry to avoid + // races + bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, + this.contentLength); } if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } + // Clamp premature EOFs: do not return -1 unless position >= contentLength + if (bytesRead < 0 && position < contentLength) { + bytesRead = 0; + } + if (bytesRead > 0) { this.position += bytesRead; } @@ -188,12 +196,15 @@ public class SeaweedInputStream extends InputStream { } final long remaining = this.contentLength - this.position; return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining + : Integer.MAX_VALUE; } /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * Returns the length of the file that this stream refers to. Note that the + * length returned is the length + * as of the time the Stream was opened. Specifically, if there have been + * subsequent appends to the file, * they wont be reflected in the returned length. * * @return length of the file. diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 68c281992..ea4c99805 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -23,15 +23,13 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; private final ConcurrentLinkedDeque<WriteOperation> writeOperations; - private final boolean shouldSaveMetadata = false; private FilerProto.Entry.Builder entry; - private long position; + private long position; // Flushed bytes (committed to service) private boolean closed; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; private ByteBuffer buffer; - private long outputIndex; private String replication = ""; private String collection = ""; @@ -44,7 +42,8 @@ public class SeaweedOutputStream extends OutputStream { } public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { + final long position, final int bufferSize, final String replication) { + this.filerClient = filerClient; this.replication = replication; this.path = path; @@ -58,8 +57,7 @@ public class SeaweedOutputStream extends OutputStream { this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, + this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, 120L, TimeUnit.SECONDS, @@ -77,8 +75,7 @@ public class SeaweedOutputStream extends OutputStream { .setFileMode(0755) .setCrtime(now) .setMtime(now) - .clearGroupName() - ); + .clearGroupName()); } } @@ -86,14 +83,35 @@ public class SeaweedOutputStream extends OutputStream { public void setReplication(String replication) { this.replication = replication; } + public void setCollection(String collection) { this.collection = collection; } + /** + * Get the current position in the output stream. + * This returns the total position including both flushed and buffered data. + * + * @return current position (flushed + buffered bytes) + */ + public synchronized long getPos() throws IOException { + // Guard against NPE if called after close() + if (buffer == null) { + return position; + } + + // Return current position (flushed + buffered) + return position + buffer.position(); + } + public static String getParentDirectory(String path) { int protoIndex = path.indexOf("://"); if (protoIndex >= 0) { - int pathStart = path.indexOf("/", protoIndex+3); + int pathStart = path.indexOf("/", protoIndex + 3); + if (pathStart < 0) { + // No path segment; treat as root (e.g., "seaweedfs://host") + return "/"; + } path = path.substring(pathStart); } if (path.equals("/")) { @@ -116,6 +134,13 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { + + // Set the file size in attributes based on our position + // This ensures Parquet footer metadata matches what we actually wrote + FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder(); + attrBuilder.setFileSize(offset); + entry.setAttributes(attrBuilder); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); @@ -125,7 +150,7 @@ public class SeaweedOutputStream extends OutputStream { @Override public void write(final int byteVal) throws IOException { - write(new byte[]{(byte) (byteVal & 0xFF)}); + write(new byte[] { (byte) (byteVal & 0xFF) }); } @Override @@ -141,8 +166,6 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } - // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); - int currentOffset = off; int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; @@ -154,9 +177,11 @@ public class SeaweedOutputStream extends OutputStream { break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + + // ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); currentOffset += writableBytes; + writeCurrentBufferToService(); numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; writableBytes = bufferSize - buffer.position(); @@ -191,7 +216,6 @@ public class SeaweedOutputStream extends OutputStream { return; } - LOG.debug("close path: {}", path); try { flushInternal(); threadExecutor.shutdown(); @@ -209,28 +233,35 @@ public class SeaweedOutputStream extends OutputStream { } private synchronized void writeCurrentBufferToService() throws IOException { - if (buffer.position() == 0) { + int bufferPos = buffer.position(); + + if (bufferPos == 0) { return; } - position += submitWriteBufferToService(buffer, position); + int written = submitWriteBufferToService(buffer, position); + position += written; buffer = ByteBufferPool.request(bufferSize); } - private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) + throws IOException { - ((Buffer)bufferToWrite).flip(); + ((Buffer) bufferToWrite).flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future<Void> job = completionService.submit(() -> { - // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); - // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + // System.out.println(path + " is going to save [" + (writePosition) + "," + + // ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), + bufferToWrite.position(), bufferToWrite.limit(), path); + // System.out.println(path + " saved [" + (writePosition) + "," + + // ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; }); @@ -318,12 +349,10 @@ public class SeaweedOutputStream extends OutputStream { private static class WriteOperation { private final Future<Void> task; - private final long startOffset; private final long length; WriteOperation(final Future<Void> task, final long startOffset, final long length) { this.task = task; - this.startOffset = startOffset; this.length = length; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index cac85d186..3fd184671 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -24,9 +24,10 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals, - final long position, final ByteBuffer buf, final long fileSize) throws IOException { + final long position, final ByteBuffer buf, final long fileSize) throws IOException { - List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); + int originalRemaining = buf.remaining(); + List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, originalRemaining); Map<String, FilerProto.Locations> knownLocations = new HashMap<>(); @@ -51,7 +52,7 @@ public class SeaweedRead { } } - //TODO parallel this + // TODO parallel this long readCount = 0; long startOffset = position; for (ChunkView chunkView : chunkViews) { @@ -59,7 +60,7 @@ public class SeaweedRead { if (startOffset < chunkView.logicOffset) { long gap = chunkView.logicOffset - startOffset; LOG.debug("zero [{},{})", startOffset, startOffset + gap); - buf.position(buf.position()+ (int)gap); + buf.position(buf.position() + (int) gap); readCount += gap; startOffset += gap; } @@ -81,12 +82,17 @@ public class SeaweedRead { } - long limit = Math.min(buf.limit(), fileSize); + // Fix: Calculate the correct limit based on the read position and requested + // size, + // not the buffer's absolute limit. This fixes the 78-byte EOF error when + // seeking + // near the end of the file. + long limit = Math.min(position + originalRemaining, fileSize); if (startOffset < limit) { long gap = limit - startOffset; LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); - buf.position(buf.position()+ (int)gap); + buf.position(buf.position() + (int) gap); readCount += gap; startOffset += gap; } @@ -94,7 +100,8 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, + FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); @@ -105,13 +112,15 @@ public class SeaweedRead { int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset); LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}", - chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset); + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, + chunkView.logicOffset + chunkView.size, startOffset); buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len); return len; } - public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, + FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; @@ -214,8 +223,7 @@ public class SeaweedRead { chunkStart, isFullChunk, chunk.cipherKey, - chunk.isCompressed - )); + chunk.isCompressed)); } } return views; @@ -239,7 +247,10 @@ public class SeaweedRead { } public static long fileSize(FilerProto.Entry entry) { - return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize()); + long chunksSize = totalSize(entry.getChunksList()); + long attrSize = entry.getAttributes().getFileSize(); + long finalSize = Math.max(chunksSize, attrSize); + return finalSize; } public static long totalSize(List<FilerProto.FileChunk> chunksList) { @@ -263,7 +274,8 @@ public class SeaweedRead { public final byte[] cipherKey; public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, + boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; @@ -297,7 +309,8 @@ public class SeaweedRead { public final byte[] cipherKey; public final boolean isCompressed; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, + boolean isCompressed) { this.fileId = fileId; this.offset = offset; this.size = size; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 88c7cefbe..0fadd53cc 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -2,6 +2,7 @@ package seaweedfs.client; import com.google.common.base.Strings; import com.google.protobuf.ByteString; +import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; @@ -25,13 +26,13 @@ public class SeaweedWrite { private static final SecureRandom random = new SecureRandom(); public static void writeData(FilerProto.Entry.Builder entry, - final String replication, - String collection, - final FilerClient filerClient, - final long offset, - final byte[] bytes, - final long bytesOffset, final long bytesLength, - final String path) throws IOException { + final String replication, + String collection, + final FilerClient filerClient, + final long offset, + final byte[] bytes, + final long bytesOffset, final long bytesLength, + final String path) throws IOException { IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { @@ -60,21 +61,50 @@ public class SeaweedWrite { } public static FilerProto.FileChunk.Builder writeChunk(final String replication, - final String collection, - final FilerClient filerClient, - final long offset, - final byte[] bytes, - final long bytesOffset, - final long bytesLength, - final String path) throws IOException { - FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( - FilerProto.AssignVolumeRequest.newBuilder() - .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection) - .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) - .setDataCenter("") - .setTtlSec(0) - .setPath(path) - .build()); + final String collection, + final FilerClient filerClient, + final long offset, + final byte[] bytes, + final long bytesOffset, + final long bytesLength, + final String path) throws IOException { + + // Retry assignVolume call for transient network/server errors + FilerProto.AssignVolumeResponse response = null; + IOException lastException = null; + int maxRetries = 3; + + for (int attempt = 0; attempt < maxRetries; attempt++) { + try { + response = filerClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection( + Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection) + .setReplication( + Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) + .setDataCenter("") + .setTtlSec(0) + .setPath(path) + .build()); + break; // Success, exit retry loop + } catch (io.grpc.StatusRuntimeException e) { + lastException = new IOException( + "assignVolume failed (attempt " + (attempt + 1) + "/" + maxRetries + "): " + e.getMessage(), e); + if (attempt < maxRetries - 1) { + try { + Thread.sleep(100 * (attempt + 1)); // Exponential backoff: 100ms, 200ms, 300ms + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during retry", ie); + } + } + } + } + + if (response == null) { + throw lastException != null ? lastException + : new IOException("assignVolume failed after " + maxRetries + " attempts"); + } if (!Strings.isNullOrEmpty(response.getError())) { throw new IOException(response.getError()); @@ -83,7 +113,8 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl()); + String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), + response.getLocation().getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; @@ -94,8 +125,6 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - LOG.debug("write file chunk {} size {}", targetUrl, bytesLength); - return FilerProto.FileChunk.newBuilder() .setFileId(fileId) .setOffset(offset) @@ -106,27 +135,28 @@ public class SeaweedWrite { } public static void writeMeta(final FilerClient filerClient, - final String parentDirectory, - final FilerProto.Entry.Builder entry) throws IOException { + final String parentDirectory, + final FilerProto.Entry.Builder entry) throws IOException { synchronized (entry) { - List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory); + List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), + parentDirectory); + entry.clearChunks(); entry.addAllChunks(chunks); filerClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) .setEntry(entry) - .build() - ); + .build()); } } private static String multipartUpload(String targetUrl, - String auth, - final byte[] bytes, - final long bytesOffset, final long bytesLength, - byte[] cipherKey) throws IOException { + String auth, + final byte[] bytes, + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { MessageDigest md = null; try { md = MessageDigest.getInstance("MD5"); @@ -162,8 +192,10 @@ public class SeaweedWrite { try { if (response.getStatusLine().getStatusCode() / 100 != 2) { - if (response.getEntity().getContentType() != null && response.getEntity().getContentType().getValue().equals("application/json")) { - throw new IOException(EntityUtils.toString(response.getEntity(), "UTF-8")); + HttpEntity entity = response.getEntity(); + if (entity != null && entity.getContentType() != null + && entity.getContentType().getValue().equals("application/json")) { + throw new IOException(EntityUtils.toString(entity, "UTF-8")); } else { throw new IOException(response.getStatusLine().getReasonPhrase()); } diff --git a/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java new file mode 100644 index 000000000..d49e17e72 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java @@ -0,0 +1,303 @@ +package seaweedfs.client; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.*; + +/** + * Unit test to reproduce the Parquet EOF issue. + * + * The issue: When Parquet writes column chunks, it calls getPos() to record + * offsets. + * If getPos() returns a position that doesn't include buffered (unflushed) + * data, + * the footer metadata will have incorrect offsets. + * + * This test simulates Parquet's behavior: + * 1. Write some data (column chunk 1) + * 2. Call getPos() - Parquet records this as the END of chunk 1 + * 3. Write more data (column chunk 2) + * 4. Call getPos() - Parquet records this as the END of chunk 2 + * 5. Close the file + * 6. Verify that the recorded positions match the actual file content + * + * Prerequisites: + * - SeaweedFS master, volume server, and filer must be running + * - Default ports: filer HTTP 8888, filer gRPC 18888 + * + * To run: + * export SEAWEEDFS_TEST_ENABLED=true + * cd other/java/client + * mvn test -Dtest=GetPosBufferTest + */ +public class GetPosBufferTest { + + private FilerClient filerClient; + private static final String TEST_ROOT = "/test-getpos-buffer"; + private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + + @Before + public void setUp() throws Exception { + if (!TESTS_ENABLED) { + return; + } + + String filerHost = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); + String filerGrpcPort = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888"); + + filerClient = new FilerClient(filerHost, Integer.parseInt(filerGrpcPort)); + + // Clean up any existing test directory + if (filerClient.exists(TEST_ROOT)) { + filerClient.rm(TEST_ROOT, true, true); + } + + // Create test root directory + filerClient.mkdirs(TEST_ROOT, 0755); + } + + @After + public void tearDown() throws Exception { + if (!TESTS_ENABLED) { + return; + } + if (filerClient != null) { + filerClient.rm(TEST_ROOT, true, true); + filerClient.shutdown(); + } + } + + @Test + public void testGetPosWithBufferedData() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with buffered data ==="); + + String testPath = TEST_ROOT + "/getpos-test.bin"; + + // Simulate what Parquet does when writing column chunks + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Write "column chunk 1" - 100 bytes + byte[] chunk1 = new byte[100]; + for (int i = 0; i < 100; i++) { + chunk1[i] = (byte) i; + } + outputStream.write(chunk1); + + // Parquet calls getPos() here to record end of chunk 1 + long posAfterChunk1 = outputStream.getPos(); + System.out.println("Position after chunk 1 (100 bytes): " + posAfterChunk1); + assertEquals("getPos() should return 100 after writing 100 bytes", 100, posAfterChunk1); + + // Write "column chunk 2" - 200 bytes + byte[] chunk2 = new byte[200]; + for (int i = 0; i < 200; i++) { + chunk2[i] = (byte) (i + 100); + } + outputStream.write(chunk2); + + // Parquet calls getPos() here to record end of chunk 2 + long posAfterChunk2 = outputStream.getPos(); + System.out.println("Position after chunk 2 (200 more bytes): " + posAfterChunk2); + assertEquals("getPos() should return 300 after writing 300 bytes total", 300, posAfterChunk2); + + // Write "column chunk 3" - small chunk of 78 bytes (the problematic size!) + byte[] chunk3 = new byte[78]; + for (int i = 0; i < 78; i++) { + chunk3[i] = (byte) (i + 50); + } + outputStream.write(chunk3); + + // Parquet calls getPos() here to record end of chunk 3 + long posAfterChunk3 = outputStream.getPos(); + System.out.println("Position after chunk 3 (78 more bytes): " + posAfterChunk3); + assertEquals("getPos() should return 378 after writing 378 bytes total", 378, posAfterChunk3); + + // Close to flush everything + outputStream.close(); + System.out.println("File closed successfully"); + + // Now read the file and verify its actual size matches what getPos() reported + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + + long actualFileSize = SeaweedRead.fileSize(entry); + System.out.println("Actual file size on disk: " + actualFileSize); + + assertEquals("File size should match the last getPos() value", 378, actualFileSize); + + // Now read the file and verify we can read all the data + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + + byte[] readBuffer = new byte[500]; // Larger buffer to read everything + int totalRead = 0; + int bytesRead; + while ((bytesRead = inputStream.read(readBuffer, totalRead, readBuffer.length - totalRead)) > 0) { + totalRead += bytesRead; + } + inputStream.close(); + + System.out.println("Total bytes read: " + totalRead); + assertEquals("Should read exactly 378 bytes", 378, totalRead); + + // Verify the data is correct + for (int i = 0; i < 100; i++) { + assertEquals("Chunk 1 data mismatch at byte " + i, (byte) i, readBuffer[i]); + } + for (int i = 0; i < 200; i++) { + assertEquals("Chunk 2 data mismatch at byte " + (100 + i), (byte) (i + 100), readBuffer[100 + i]); + } + for (int i = 0; i < 78; i++) { + assertEquals("Chunk 3 data mismatch at byte " + (300 + i), (byte) (i + 50), readBuffer[300 + i]); + } + + System.out.println("SUCCESS: All data verified correctly!\n"); + } + + @Test + public void testGetPosWithSmallWrites() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with many small writes (Parquet pattern) ==="); + + String testPath = TEST_ROOT + "/small-writes-test.bin"; + + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Parquet writes column data in small chunks and frequently calls getPos() + String[] columnData = { "Alice", "Bob", "Charlie", "David" }; + long[] recordedPositions = new long[columnData.length]; + + for (int i = 0; i < columnData.length; i++) { + byte[] data = columnData[i].getBytes(StandardCharsets.UTF_8); + outputStream.write(data); + + // Parquet calls getPos() after each value to track offsets + recordedPositions[i] = outputStream.getPos(); + System.out.println("After writing '" + columnData[i] + "': pos=" + recordedPositions[i]); + } + + long finalPos = outputStream.getPos(); + System.out.println("Final position before close: " + finalPos); + + outputStream.close(); + + // Verify file size + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + long actualFileSize = SeaweedRead.fileSize(entry); + + System.out.println("Actual file size: " + actualFileSize); + assertEquals("File size should match final getPos()", finalPos, actualFileSize); + + // Verify we can read using the recorded positions + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + + long currentPos = 0; + for (int i = 0; i < columnData.length; i++) { + long nextPos = recordedPositions[i]; + int length = (int) (nextPos - currentPos); + + byte[] buffer = new byte[length]; + int bytesRead = inputStream.read(buffer, 0, length); + + assertEquals("Should read " + length + " bytes for '" + columnData[i] + "'", length, bytesRead); + + String readData = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8); + System.out.println("Read at offset " + currentPos + ": '" + readData + "'"); + assertEquals("Data mismatch", columnData[i], readData); + + currentPos = nextPos; + } + + inputStream.close(); + + System.out.println("SUCCESS: Small writes with getPos() tracking work correctly!\n"); + } + + @Test + public void testGetPosWithExactly78BytesBuffered() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with EXACTLY 78 bytes buffered (the bug size!) ==="); + + String testPath = TEST_ROOT + "/78-bytes-test.bin"; + + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Write some initial data + byte[] initial = new byte[1000]; + for (int i = 0; i < 1000; i++) { + initial[i] = (byte) i; + } + outputStream.write(initial); + outputStream.flush(); // Ensure this is flushed + + long posAfterFlush = outputStream.getPos(); + System.out.println("Position after 1000 bytes + flush: " + posAfterFlush); + assertEquals("Should be at position 1000 after flush", 1000, posAfterFlush); + + // Now write EXACTLY 78 bytes (the problematic buffer size in our bug) + byte[] problematicChunk = new byte[78]; + for (int i = 0; i < 78; i++) { + problematicChunk[i] = (byte) (i + 50); + } + outputStream.write(problematicChunk); + + // DO NOT FLUSH - this is the bug scenario! + // Parquet calls getPos() here while the 78 bytes are still buffered + long posWithBufferedData = outputStream.getPos(); + System.out.println("Position with 78 bytes BUFFERED (not flushed): " + posWithBufferedData); + + // This MUST return 1078, not 1000! + assertEquals("getPos() MUST include buffered data", 1078, posWithBufferedData); + + // Now close (which will flush) + outputStream.close(); + + // Verify actual file size + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + long actualFileSize = SeaweedRead.fileSize(entry); + + System.out.println("Actual file size: " + actualFileSize); + assertEquals("File size must be 1078", 1078, actualFileSize); + + // Try to read at position 1000 for 78 bytes (what Parquet would try) + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1000); + + byte[] readBuffer = new byte[78]; + int bytesRead = inputStream.read(readBuffer, 0, 78); + + System.out.println("Bytes read at position 1000: " + bytesRead); + assertEquals("Should successfully read 78 bytes at position 1000", 78, bytesRead); + + // Verify the data matches + for (int i = 0; i < 78; i++) { + assertEquals("Data mismatch at byte " + i, problematicChunk[i], readBuffer[i]); + } + + inputStream.close(); + + System.out.println("SUCCESS: getPos() correctly includes buffered data!\n"); + } +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java index f384e059f..3cfb2ce9e 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java @@ -28,22 +28,21 @@ public class SeaweedStreamIntegrationTest { private FilerClient filerClient; private static final String TEST_ROOT = "/test-stream-integration"; - private static final boolean TESTS_ENABLED = - "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); @Before public void setUp() throws Exception { if (!TESTS_ENABLED) { return; } - + filerClient = new FilerClient("localhost", 18888); - + // Clean up any existing test directory if (filerClient.exists(TEST_ROOT)) { filerClient.rm(TEST_ROOT, true, true); } - + // Create test root directory filerClient.mkdirs(TEST_ROOT, 0755); } @@ -53,7 +52,7 @@ public class SeaweedStreamIntegrationTest { if (!TESTS_ENABLED || filerClient == null) { return; } - + try { // Clean up test directory if (filerClient.exists(TEST_ROOT)) { @@ -70,30 +69,29 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/small.txt"; String testContent = "Hello, SeaweedFS!"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Verify file exists assertTrue("File should exist", filerClient.exists(testPath)); - + // Read file FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); assertNotNull("Entry should not be null", entry); - + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[testContent.length()]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read all bytes", testContent.length(), bytesRead); assertEquals("Content should match", testContent, new String(buffer, StandardCharsets.UTF_8)); } @@ -104,43 +102,42 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/large.bin"; int fileSize = 10 * 1024 * 1024; // 10 MB - + // Generate random data byte[] originalData = new byte[fileSize]; new Random(42).nextBytes(originalData); // Use seed for reproducibility - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(originalData); outputStream.close(); - + // Verify file exists assertTrue("File should exist", filerClient.exists(testPath)); - + // Read file FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); assertNotNull("Entry should not be null", entry); - + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); - + // Read file in chunks to handle large files properly byte[] readData = new byte[fileSize]; int totalRead = 0; int bytesRead; byte[] buffer = new byte[8192]; // Read in 8KB chunks - + while ((bytesRead = inputStream.read(buffer)) > 0) { System.arraycopy(buffer, 0, readData, totalRead, bytesRead); totalRead += bytesRead; } inputStream.close(); - + assertEquals("Should read all bytes", fileSize, totalRead); assertArrayEquals("Content should match", originalData, readData); } @@ -151,31 +148,30 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/chunked.txt"; - String[] chunks = {"First chunk. ", "Second chunk. ", "Third chunk."}; - + String[] chunks = { "First chunk. ", "Second chunk. ", "Third chunk." }; + // Write file in chunks SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); for (String chunk : chunks) { outputStream.write(chunk.getBytes(StandardCharsets.UTF_8)); } outputStream.close(); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[1024]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + String expected = String.join("", chunks); String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8); - + assertEquals("Content should match", expected, actual); } @@ -185,31 +181,30 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/offset.txt"; String testContent = "0123456789ABCDEFGHIJ"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read with offset FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); inputStream.seek(10); // Skip first 10 bytes - + byte[] buffer = new byte[10]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read 10 bytes", 10, bytesRead); - assertEquals("Should read from offset", "ABCDEFGHIJ", - new String(buffer, StandardCharsets.UTF_8)); + assertEquals("Should read from offset", "ABCDEFGHIJ", + new String(buffer, StandardCharsets.UTF_8)); } @Test @@ -218,32 +213,31 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/partial.txt"; String testContent = "The quick brown fox jumps over the lazy dog"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read partial FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); - + // Read only "quick brown" inputStream.seek(4); byte[] buffer = new byte[11]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read 11 bytes", 11, bytesRead); - assertEquals("Should read partial content", "quick brown", - new String(buffer, StandardCharsets.UTF_8)); + assertEquals("Should read partial content", "quick brown", + new String(buffer, StandardCharsets.UTF_8)); } @Test @@ -252,28 +246,27 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/empty.txt"; - + // Write empty file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.close(); - + // Verify file exists assertTrue("File should exist", filerClient.exists(testPath)); - + // Read empty file FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); assertNotNull("Entry should not be null", entry); - + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[100]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read 0 bytes from empty file", -1, bytesRead); } @@ -283,32 +276,31 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/overwrite.txt"; String originalContent = "Original content"; String newContent = "New content that overwrites the original"; - + // Write original file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(originalContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Overwrite file outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(newContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[1024]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8); assertEquals("Should have new content", newContent, actual); } @@ -319,23 +311,22 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/multireads.txt"; String testContent = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read in multiple small chunks FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); - + StringBuilder result = new StringBuilder(); byte[] buffer = new byte[5]; int bytesRead; @@ -343,7 +334,7 @@ public class SeaweedStreamIntegrationTest { result.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); } inputStream.close(); - + assertEquals("Should read entire content", testContent, result.toString()); } @@ -353,29 +344,28 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/binary.bin"; byte[] binaryData = new byte[256]; for (int i = 0; i < 256; i++) { binaryData[i] = (byte) i; } - + // Write binary file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(binaryData); outputStream.close(); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] readData = new byte[256]; int bytesRead = inputStream.read(readData); inputStream.close(); - + assertEquals("Should read all bytes", 256, bytesRead); assertArrayEquals("Binary data should match", binaryData, readData); } @@ -386,32 +376,132 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/flush.txt"; String testContent = "Content to flush"; - + // Write file with flush SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.flush(); // Explicitly flush outputStream.close(); - + // Verify file was written assertTrue("File should exist after flush", filerClient.exists(testPath)); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[testContent.length()]; int bytesRead = inputStream.read(buffer); inputStream.close(); - - assertEquals("Content should match", testContent, - new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); + + assertEquals("Content should match", testContent, + new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); } -} + /** + * Tests range reads similar to how Parquet reads column chunks. + * This simulates: + * 1. Seeking to specific offsets + * 2. Reading specific byte ranges + * 3. Verifying each read() call returns the correct number of bytes + * + * This test specifically addresses the bug where read() was returning 0 + * for inline content or -1 prematurely for chunked reads. + */ + @Test + public void testRangeReads() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + String testPath = TEST_ROOT + "/rangereads.dat"; + + // Create a 1275-byte file (similar to the Parquet file size that was failing) + byte[] testData = new byte[1275]; + Random random = new Random(42); // Fixed seed for reproducibility + random.nextBytes(testData); + + // Write file + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + outputStream.write(testData); + outputStream.close(); + + // Read file entry + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + + // Test 1: Read last 8 bytes (like reading Parquet footer length) + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1267); + byte[] buffer = new byte[8]; + int bytesRead = inputStream.read(buffer, 0, 8); + assertEquals("Should read 8 bytes at offset 1267", 8, bytesRead); + assertArrayEquals("Content at offset 1267 should match", + Arrays.copyOfRange(testData, 1267, 1275), buffer); + inputStream.close(); + + // Test 2: Read large chunk in middle (like reading column data) + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(383); + buffer = new byte[884]; // Read bytes 383-1267 + bytesRead = inputStream.read(buffer, 0, 884); + assertEquals("Should read 884 bytes at offset 383", 884, bytesRead); + assertArrayEquals("Content at offset 383 should match", + Arrays.copyOfRange(testData, 383, 1267), buffer); + inputStream.close(); + + // Test 3: Read from beginning (like reading Parquet magic bytes) + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + buffer = new byte[4]; + bytesRead = inputStream.read(buffer, 0, 4); + assertEquals("Should read 4 bytes at offset 0", 4, bytesRead); + assertArrayEquals("Content at offset 0 should match", + Arrays.copyOfRange(testData, 0, 4), buffer); + inputStream.close(); + + // Test 4: Multiple sequential reads without seeking (like + // H2SeekableInputStream.readFully) + // This is the critical test case that was failing! + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1197); // Position where EOF was being returned prematurely + + byte[] fullBuffer = new byte[78]; // Try to read the "missing" 78 bytes + int totalRead = 0; + int offset = 0; + int remaining = 78; + + // Simulate Parquet's H2SeekableInputStream.readFully() loop + while (remaining > 0) { + int read = inputStream.read(fullBuffer, offset, remaining); + if (read == -1) { + fail(String.format( + "Got EOF after reading %d bytes, but expected to read %d more bytes (total requested: 78)", + totalRead, remaining)); + } + assertTrue("Each read() should return positive bytes", read > 0); + totalRead += read; + offset += read; + remaining -= read; + } + + assertEquals("Should read all 78 bytes in readFully loop", 78, totalRead); + assertArrayEquals("Content at offset 1197 should match", + Arrays.copyOfRange(testData, 1197, 1275), fullBuffer); + inputStream.close(); + + // Test 5: Read entire file in one go + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + byte[] allData = new byte[1275]; + bytesRead = inputStream.read(allData, 0, 1275); + assertEquals("Should read entire 1275 bytes", 1275, bytesRead); + assertArrayEquals("Entire content should match", testData, allData); + inputStream.close(); + } +} |
