aboutsummaryrefslogtreecommitdiff
path: root/other/java/client/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client/src/test')
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java303
-rw-r--r--other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java302
2 files changed, 499 insertions, 106 deletions
diff --git a/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java
new file mode 100644
index 000000000..d49e17e72
--- /dev/null
+++ b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java
@@ -0,0 +1,303 @@
+package seaweedfs.client;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test to reproduce the Parquet EOF issue.
+ *
+ * The issue: When Parquet writes column chunks, it calls getPos() to record
+ * offsets.
+ * If getPos() returns a position that doesn't include buffered (unflushed)
+ * data,
+ * the footer metadata will have incorrect offsets.
+ *
+ * This test simulates Parquet's behavior:
+ * 1. Write some data (column chunk 1)
+ * 2. Call getPos() - Parquet records this as the END of chunk 1
+ * 3. Write more data (column chunk 2)
+ * 4. Call getPos() - Parquet records this as the END of chunk 2
+ * 5. Close the file
+ * 6. Verify that the recorded positions match the actual file content
+ *
+ * Prerequisites:
+ * - SeaweedFS master, volume server, and filer must be running
+ * - Default ports: filer HTTP 8888, filer gRPC 18888
+ *
+ * To run:
+ * export SEAWEEDFS_TEST_ENABLED=true
+ * cd other/java/client
+ * mvn test -Dtest=GetPosBufferTest
+ */
+public class GetPosBufferTest {
+
+ private FilerClient filerClient;
+ private static final String TEST_ROOT = "/test-getpos-buffer";
+ private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+
+ @Before
+ public void setUp() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+
+ String filerHost = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
+ String filerGrpcPort = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888");
+
+ filerClient = new FilerClient(filerHost, Integer.parseInt(filerGrpcPort));
+
+ // Clean up any existing test directory
+ if (filerClient.exists(TEST_ROOT)) {
+ filerClient.rm(TEST_ROOT, true, true);
+ }
+
+ // Create test root directory
+ filerClient.mkdirs(TEST_ROOT, 0755);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ if (filerClient != null) {
+ filerClient.rm(TEST_ROOT, true, true);
+ filerClient.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetPosWithBufferedData() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with buffered data ===");
+
+ String testPath = TEST_ROOT + "/getpos-test.bin";
+
+ // Simulate what Parquet does when writing column chunks
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write "column chunk 1" - 100 bytes
+ byte[] chunk1 = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ chunk1[i] = (byte) i;
+ }
+ outputStream.write(chunk1);
+
+ // Parquet calls getPos() here to record end of chunk 1
+ long posAfterChunk1 = outputStream.getPos();
+ System.out.println("Position after chunk 1 (100 bytes): " + posAfterChunk1);
+ assertEquals("getPos() should return 100 after writing 100 bytes", 100, posAfterChunk1);
+
+ // Write "column chunk 2" - 200 bytes
+ byte[] chunk2 = new byte[200];
+ for (int i = 0; i < 200; i++) {
+ chunk2[i] = (byte) (i + 100);
+ }
+ outputStream.write(chunk2);
+
+ // Parquet calls getPos() here to record end of chunk 2
+ long posAfterChunk2 = outputStream.getPos();
+ System.out.println("Position after chunk 2 (200 more bytes): " + posAfterChunk2);
+ assertEquals("getPos() should return 300 after writing 300 bytes total", 300, posAfterChunk2);
+
+ // Write "column chunk 3" - small chunk of 78 bytes (the problematic size!)
+ byte[] chunk3 = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ chunk3[i] = (byte) (i + 50);
+ }
+ outputStream.write(chunk3);
+
+ // Parquet calls getPos() here to record end of chunk 3
+ long posAfterChunk3 = outputStream.getPos();
+ System.out.println("Position after chunk 3 (78 more bytes): " + posAfterChunk3);
+ assertEquals("getPos() should return 378 after writing 378 bytes total", 378, posAfterChunk3);
+
+ // Close to flush everything
+ outputStream.close();
+ System.out.println("File closed successfully");
+
+ // Now read the file and verify its actual size matches what getPos() reported
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
+ long actualFileSize = SeaweedRead.fileSize(entry);
+ System.out.println("Actual file size on disk: " + actualFileSize);
+
+ assertEquals("File size should match the last getPos() value", 378, actualFileSize);
+
+ // Now read the file and verify we can read all the data
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ byte[] readBuffer = new byte[500]; // Larger buffer to read everything
+ int totalRead = 0;
+ int bytesRead;
+ while ((bytesRead = inputStream.read(readBuffer, totalRead, readBuffer.length - totalRead)) > 0) {
+ totalRead += bytesRead;
+ }
+ inputStream.close();
+
+ System.out.println("Total bytes read: " + totalRead);
+ assertEquals("Should read exactly 378 bytes", 378, totalRead);
+
+ // Verify the data is correct
+ for (int i = 0; i < 100; i++) {
+ assertEquals("Chunk 1 data mismatch at byte " + i, (byte) i, readBuffer[i]);
+ }
+ for (int i = 0; i < 200; i++) {
+ assertEquals("Chunk 2 data mismatch at byte " + (100 + i), (byte) (i + 100), readBuffer[100 + i]);
+ }
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Chunk 3 data mismatch at byte " + (300 + i), (byte) (i + 50), readBuffer[300 + i]);
+ }
+
+ System.out.println("SUCCESS: All data verified correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithSmallWrites() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with many small writes (Parquet pattern) ===");
+
+ String testPath = TEST_ROOT + "/small-writes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Parquet writes column data in small chunks and frequently calls getPos()
+ String[] columnData = { "Alice", "Bob", "Charlie", "David" };
+ long[] recordedPositions = new long[columnData.length];
+
+ for (int i = 0; i < columnData.length; i++) {
+ byte[] data = columnData[i].getBytes(StandardCharsets.UTF_8);
+ outputStream.write(data);
+
+ // Parquet calls getPos() after each value to track offsets
+ recordedPositions[i] = outputStream.getPos();
+ System.out.println("After writing '" + columnData[i] + "': pos=" + recordedPositions[i]);
+ }
+
+ long finalPos = outputStream.getPos();
+ System.out.println("Final position before close: " + finalPos);
+
+ outputStream.close();
+
+ // Verify file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size should match final getPos()", finalPos, actualFileSize);
+
+ // Verify we can read using the recorded positions
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ long currentPos = 0;
+ for (int i = 0; i < columnData.length; i++) {
+ long nextPos = recordedPositions[i];
+ int length = (int) (nextPos - currentPos);
+
+ byte[] buffer = new byte[length];
+ int bytesRead = inputStream.read(buffer, 0, length);
+
+ assertEquals("Should read " + length + " bytes for '" + columnData[i] + "'", length, bytesRead);
+
+ String readData = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
+ System.out.println("Read at offset " + currentPos + ": '" + readData + "'");
+ assertEquals("Data mismatch", columnData[i], readData);
+
+ currentPos = nextPos;
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: Small writes with getPos() tracking work correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithExactly78BytesBuffered() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with EXACTLY 78 bytes buffered (the bug size!) ===");
+
+ String testPath = TEST_ROOT + "/78-bytes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write some initial data
+ byte[] initial = new byte[1000];
+ for (int i = 0; i < 1000; i++) {
+ initial[i] = (byte) i;
+ }
+ outputStream.write(initial);
+ outputStream.flush(); // Ensure this is flushed
+
+ long posAfterFlush = outputStream.getPos();
+ System.out.println("Position after 1000 bytes + flush: " + posAfterFlush);
+ assertEquals("Should be at position 1000 after flush", 1000, posAfterFlush);
+
+ // Now write EXACTLY 78 bytes (the problematic buffer size in our bug)
+ byte[] problematicChunk = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ problematicChunk[i] = (byte) (i + 50);
+ }
+ outputStream.write(problematicChunk);
+
+ // DO NOT FLUSH - this is the bug scenario!
+ // Parquet calls getPos() here while the 78 bytes are still buffered
+ long posWithBufferedData = outputStream.getPos();
+ System.out.println("Position with 78 bytes BUFFERED (not flushed): " + posWithBufferedData);
+
+ // This MUST return 1078, not 1000!
+ assertEquals("getPos() MUST include buffered data", 1078, posWithBufferedData);
+
+ // Now close (which will flush)
+ outputStream.close();
+
+ // Verify actual file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size must be 1078", 1078, actualFileSize);
+
+ // Try to read at position 1000 for 78 bytes (what Parquet would try)
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1000);
+
+ byte[] readBuffer = new byte[78];
+ int bytesRead = inputStream.read(readBuffer, 0, 78);
+
+ System.out.println("Bytes read at position 1000: " + bytesRead);
+ assertEquals("Should successfully read 78 bytes at position 1000", 78, bytesRead);
+
+ // Verify the data matches
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Data mismatch at byte " + i, problematicChunk[i], readBuffer[i]);
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: getPos() correctly includes buffered data!\n");
+ }
+}
diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
index f384e059f..3cfb2ce9e 100644
--- a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
+++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java
@@ -28,22 +28,21 @@ public class SeaweedStreamIntegrationTest {
private FilerClient filerClient;
private static final String TEST_ROOT = "/test-stream-integration";
- private static final boolean TESTS_ENABLED =
- "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+ private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
@Before
public void setUp() throws Exception {
if (!TESTS_ENABLED) {
return;
}
-
+
filerClient = new FilerClient("localhost", 18888);
-
+
// Clean up any existing test directory
if (filerClient.exists(TEST_ROOT)) {
filerClient.rm(TEST_ROOT, true, true);
}
-
+
// Create test root directory
filerClient.mkdirs(TEST_ROOT, 0755);
}
@@ -53,7 +52,7 @@ public class SeaweedStreamIntegrationTest {
if (!TESTS_ENABLED || filerClient == null) {
return;
}
-
+
try {
// Clean up test directory
if (filerClient.exists(TEST_ROOT)) {
@@ -70,30 +69,29 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/small.txt";
String testContent = "Hello, SeaweedFS!";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[testContent.length()];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read all bytes", testContent.length(), bytesRead);
assertEquals("Content should match", testContent, new String(buffer, StandardCharsets.UTF_8));
}
@@ -104,43 +102,42 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/large.bin";
int fileSize = 10 * 1024 * 1024; // 10 MB
-
+
// Generate random data
byte[] originalData = new byte[fileSize];
new Random(42).nextBytes(originalData); // Use seed for reproducibility
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(originalData);
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
// Read file in chunks to handle large files properly
byte[] readData = new byte[fileSize];
int totalRead = 0;
int bytesRead;
byte[] buffer = new byte[8192]; // Read in 8KB chunks
-
+
while ((bytesRead = inputStream.read(buffer)) > 0) {
System.arraycopy(buffer, 0, readData, totalRead, bytesRead);
totalRead += bytesRead;
}
inputStream.close();
-
+
assertEquals("Should read all bytes", fileSize, totalRead);
assertArrayEquals("Content should match", originalData, readData);
}
@@ -151,31 +148,30 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/chunked.txt";
- String[] chunks = {"First chunk. ", "Second chunk. ", "Third chunk."};
-
+ String[] chunks = { "First chunk. ", "Second chunk. ", "Third chunk." };
+
// Write file in chunks
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
for (String chunk : chunks) {
outputStream.write(chunk.getBytes(StandardCharsets.UTF_8));
}
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
String expected = String.join("", chunks);
String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
-
+
assertEquals("Content should match", expected, actual);
}
@@ -185,31 +181,30 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/offset.txt";
String testContent = "0123456789ABCDEFGHIJ";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read with offset
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
inputStream.seek(10); // Skip first 10 bytes
-
+
byte[] buffer = new byte[10];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 10 bytes", 10, bytesRead);
- assertEquals("Should read from offset", "ABCDEFGHIJ",
- new String(buffer, StandardCharsets.UTF_8));
+ assertEquals("Should read from offset", "ABCDEFGHIJ",
+ new String(buffer, StandardCharsets.UTF_8));
}
@Test
@@ -218,32 +213,31 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/partial.txt";
String testContent = "The quick brown fox jumps over the lazy dog";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read partial
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
// Read only "quick brown"
inputStream.seek(4);
byte[] buffer = new byte[11];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 11 bytes", 11, bytesRead);
- assertEquals("Should read partial content", "quick brown",
- new String(buffer, StandardCharsets.UTF_8));
+ assertEquals("Should read partial content", "quick brown",
+ new String(buffer, StandardCharsets.UTF_8));
}
@Test
@@ -252,28 +246,27 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/empty.txt";
-
+
// Write empty file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.close();
-
+
// Verify file exists
assertTrue("File should exist", filerClient.exists(testPath));
-
+
// Read empty file
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
assertNotNull("Entry should not be null", entry);
-
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[100];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
assertEquals("Should read 0 bytes from empty file", -1, bytesRead);
}
@@ -283,32 +276,31 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/overwrite.txt";
String originalContent = "Original content";
String newContent = "New content that overwrites the original";
-
+
// Write original file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(originalContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Overwrite file
outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(newContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
+
String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
assertEquals("Should have new content", newContent, actual);
}
@@ -319,23 +311,22 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/multireads.txt";
String testContent = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
-
+
// Write file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.close();
-
+
// Read in multiple small chunks
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
-
+
StringBuilder result = new StringBuilder();
byte[] buffer = new byte[5];
int bytesRead;
@@ -343,7 +334,7 @@ public class SeaweedStreamIntegrationTest {
result.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
}
inputStream.close();
-
+
assertEquals("Should read entire content", testContent, result.toString());
}
@@ -353,29 +344,28 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/binary.bin";
byte[] binaryData = new byte[256];
for (int i = 0; i < 256; i++) {
binaryData[i] = (byte) i;
}
-
+
// Write binary file
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(binaryData);
outputStream.close();
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] readData = new byte[256];
int bytesRead = inputStream.read(readData);
inputStream.close();
-
+
assertEquals("Should read all bytes", 256, bytesRead);
assertArrayEquals("Binary data should match", binaryData, readData);
}
@@ -386,32 +376,132 @@ public class SeaweedStreamIntegrationTest {
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
return;
}
-
+
String testPath = TEST_ROOT + "/flush.txt";
String testContent = "Content to flush";
-
+
// Write file with flush
SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
outputStream.write(testContent.getBytes(StandardCharsets.UTF_8));
outputStream.flush(); // Explicitly flush
outputStream.close();
-
+
// Verify file was written
assertTrue("File should exist after flush", filerClient.exists(testPath));
-
+
// Read and verify
FilerProto.Entry entry = filerClient.lookupEntry(
- SeaweedOutputStream.getParentDirectory(testPath),
- SeaweedOutputStream.getFileName(testPath)
- );
-
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
byte[] buffer = new byte[testContent.length()];
int bytesRead = inputStream.read(buffer);
inputStream.close();
-
- assertEquals("Content should match", testContent,
- new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
+
+ assertEquals("Content should match", testContent,
+ new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
}
-}
+ /**
+ * Tests range reads similar to how Parquet reads column chunks.
+ * This simulates:
+ * 1. Seeking to specific offsets
+ * 2. Reading specific byte ranges
+ * 3. Verifying each read() call returns the correct number of bytes
+ *
+ * This test specifically addresses the bug where read() was returning 0
+ * for inline content or -1 prematurely for chunked reads.
+ */
+ @Test
+ public void testRangeReads() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ String testPath = TEST_ROOT + "/rangereads.dat";
+
+ // Create a 1275-byte file (similar to the Parquet file size that was failing)
+ byte[] testData = new byte[1275];
+ Random random = new Random(42); // Fixed seed for reproducibility
+ random.nextBytes(testData);
+
+ // Write file
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+ outputStream.write(testData);
+ outputStream.close();
+
+ // Read file entry
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
+ // Test 1: Read last 8 bytes (like reading Parquet footer length)
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1267);
+ byte[] buffer = new byte[8];
+ int bytesRead = inputStream.read(buffer, 0, 8);
+ assertEquals("Should read 8 bytes at offset 1267", 8, bytesRead);
+ assertArrayEquals("Content at offset 1267 should match",
+ Arrays.copyOfRange(testData, 1267, 1275), buffer);
+ inputStream.close();
+
+ // Test 2: Read large chunk in middle (like reading column data)
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(383);
+ buffer = new byte[884]; // Read bytes 383-1267
+ bytesRead = inputStream.read(buffer, 0, 884);
+ assertEquals("Should read 884 bytes at offset 383", 884, bytesRead);
+ assertArrayEquals("Content at offset 383 should match",
+ Arrays.copyOfRange(testData, 383, 1267), buffer);
+ inputStream.close();
+
+ // Test 3: Read from beginning (like reading Parquet magic bytes)
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ buffer = new byte[4];
+ bytesRead = inputStream.read(buffer, 0, 4);
+ assertEquals("Should read 4 bytes at offset 0", 4, bytesRead);
+ assertArrayEquals("Content at offset 0 should match",
+ Arrays.copyOfRange(testData, 0, 4), buffer);
+ inputStream.close();
+
+ // Test 4: Multiple sequential reads without seeking (like
+ // H2SeekableInputStream.readFully)
+ // This is the critical test case that was failing!
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1197); // Position where EOF was being returned prematurely
+
+ byte[] fullBuffer = new byte[78]; // Try to read the "missing" 78 bytes
+ int totalRead = 0;
+ int offset = 0;
+ int remaining = 78;
+
+ // Simulate Parquet's H2SeekableInputStream.readFully() loop
+ while (remaining > 0) {
+ int read = inputStream.read(fullBuffer, offset, remaining);
+ if (read == -1) {
+ fail(String.format(
+ "Got EOF after reading %d bytes, but expected to read %d more bytes (total requested: 78)",
+ totalRead, remaining));
+ }
+ assertTrue("Each read() should return positive bytes", read > 0);
+ totalRead += read;
+ offset += read;
+ remaining -= read;
+ }
+
+ assertEquals("Should read all 78 bytes in readFully loop", 78, totalRead);
+ assertArrayEquals("Content at offset 1197 should match",
+ Arrays.copyOfRange(testData, 1197, 1275), fullBuffer);
+ inputStream.close();
+
+ // Test 5: Read entire file in one go
+ inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ byte[] allData = new byte[1275];
+ bytesRead = inputStream.read(allData, 0, 1275);
+ assertEquals("Should read entire 1275 bytes", 1275, bytesRead);
+ assertArrayEquals("Entire content should match", testData, allData);
+ inputStream.close();
+ }
+}