diff options
Diffstat (limited to 'other/java/client/src/test')
| -rw-r--r-- | other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java | 303 | ||||
| -rw-r--r-- | other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java | 302 |
2 files changed, 499 insertions, 106 deletions
diff --git a/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java new file mode 100644 index 000000000..d49e17e72 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/GetPosBufferTest.java @@ -0,0 +1,303 @@ +package seaweedfs.client; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.*; + +/** + * Unit test to reproduce the Parquet EOF issue. + * + * The issue: When Parquet writes column chunks, it calls getPos() to record + * offsets. + * If getPos() returns a position that doesn't include buffered (unflushed) + * data, + * the footer metadata will have incorrect offsets. + * + * This test simulates Parquet's behavior: + * 1. Write some data (column chunk 1) + * 2. Call getPos() - Parquet records this as the END of chunk 1 + * 3. Write more data (column chunk 2) + * 4. Call getPos() - Parquet records this as the END of chunk 2 + * 5. Close the file + * 6. Verify that the recorded positions match the actual file content + * + * Prerequisites: + * - SeaweedFS master, volume server, and filer must be running + * - Default ports: filer HTTP 8888, filer gRPC 18888 + * + * To run: + * export SEAWEEDFS_TEST_ENABLED=true + * cd other/java/client + * mvn test -Dtest=GetPosBufferTest + */ +public class GetPosBufferTest { + + private FilerClient filerClient; + private static final String TEST_ROOT = "/test-getpos-buffer"; + private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + + @Before + public void setUp() throws Exception { + if (!TESTS_ENABLED) { + return; + } + + String filerHost = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); + String filerGrpcPort = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888"); + + filerClient = new FilerClient(filerHost, Integer.parseInt(filerGrpcPort)); + + // Clean up any existing test directory + if (filerClient.exists(TEST_ROOT)) { + filerClient.rm(TEST_ROOT, true, true); + } + + // Create test root directory + filerClient.mkdirs(TEST_ROOT, 0755); + } + + @After + public void tearDown() throws Exception { + if (!TESTS_ENABLED) { + return; + } + if (filerClient != null) { + filerClient.rm(TEST_ROOT, true, true); + filerClient.shutdown(); + } + } + + @Test + public void testGetPosWithBufferedData() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with buffered data ==="); + + String testPath = TEST_ROOT + "/getpos-test.bin"; + + // Simulate what Parquet does when writing column chunks + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Write "column chunk 1" - 100 bytes + byte[] chunk1 = new byte[100]; + for (int i = 0; i < 100; i++) { + chunk1[i] = (byte) i; + } + outputStream.write(chunk1); + + // Parquet calls getPos() here to record end of chunk 1 + long posAfterChunk1 = outputStream.getPos(); + System.out.println("Position after chunk 1 (100 bytes): " + posAfterChunk1); + assertEquals("getPos() should return 100 after writing 100 bytes", 100, posAfterChunk1); + + // Write "column chunk 2" - 200 bytes + byte[] chunk2 = new byte[200]; + for (int i = 0; i < 200; i++) { + chunk2[i] = (byte) (i + 100); + } + outputStream.write(chunk2); + + // Parquet calls getPos() here to record end of chunk 2 + long posAfterChunk2 = outputStream.getPos(); + System.out.println("Position after chunk 2 (200 more bytes): " + posAfterChunk2); + assertEquals("getPos() should return 300 after writing 300 bytes total", 300, posAfterChunk2); + + // Write "column chunk 3" - small chunk of 78 bytes (the problematic size!) + byte[] chunk3 = new byte[78]; + for (int i = 0; i < 78; i++) { + chunk3[i] = (byte) (i + 50); + } + outputStream.write(chunk3); + + // Parquet calls getPos() here to record end of chunk 3 + long posAfterChunk3 = outputStream.getPos(); + System.out.println("Position after chunk 3 (78 more bytes): " + posAfterChunk3); + assertEquals("getPos() should return 378 after writing 378 bytes total", 378, posAfterChunk3); + + // Close to flush everything + outputStream.close(); + System.out.println("File closed successfully"); + + // Now read the file and verify its actual size matches what getPos() reported + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + + long actualFileSize = SeaweedRead.fileSize(entry); + System.out.println("Actual file size on disk: " + actualFileSize); + + assertEquals("File size should match the last getPos() value", 378, actualFileSize); + + // Now read the file and verify we can read all the data + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + + byte[] readBuffer = new byte[500]; // Larger buffer to read everything + int totalRead = 0; + int bytesRead; + while ((bytesRead = inputStream.read(readBuffer, totalRead, readBuffer.length - totalRead)) > 0) { + totalRead += bytesRead; + } + inputStream.close(); + + System.out.println("Total bytes read: " + totalRead); + assertEquals("Should read exactly 378 bytes", 378, totalRead); + + // Verify the data is correct + for (int i = 0; i < 100; i++) { + assertEquals("Chunk 1 data mismatch at byte " + i, (byte) i, readBuffer[i]); + } + for (int i = 0; i < 200; i++) { + assertEquals("Chunk 2 data mismatch at byte " + (100 + i), (byte) (i + 100), readBuffer[100 + i]); + } + for (int i = 0; i < 78; i++) { + assertEquals("Chunk 3 data mismatch at byte " + (300 + i), (byte) (i + 50), readBuffer[300 + i]); + } + + System.out.println("SUCCESS: All data verified correctly!\n"); + } + + @Test + public void testGetPosWithSmallWrites() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with many small writes (Parquet pattern) ==="); + + String testPath = TEST_ROOT + "/small-writes-test.bin"; + + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Parquet writes column data in small chunks and frequently calls getPos() + String[] columnData = { "Alice", "Bob", "Charlie", "David" }; + long[] recordedPositions = new long[columnData.length]; + + for (int i = 0; i < columnData.length; i++) { + byte[] data = columnData[i].getBytes(StandardCharsets.UTF_8); + outputStream.write(data); + + // Parquet calls getPos() after each value to track offsets + recordedPositions[i] = outputStream.getPos(); + System.out.println("After writing '" + columnData[i] + "': pos=" + recordedPositions[i]); + } + + long finalPos = outputStream.getPos(); + System.out.println("Final position before close: " + finalPos); + + outputStream.close(); + + // Verify file size + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + long actualFileSize = SeaweedRead.fileSize(entry); + + System.out.println("Actual file size: " + actualFileSize); + assertEquals("File size should match final getPos()", finalPos, actualFileSize); + + // Verify we can read using the recorded positions + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + + long currentPos = 0; + for (int i = 0; i < columnData.length; i++) { + long nextPos = recordedPositions[i]; + int length = (int) (nextPos - currentPos); + + byte[] buffer = new byte[length]; + int bytesRead = inputStream.read(buffer, 0, length); + + assertEquals("Should read " + length + " bytes for '" + columnData[i] + "'", length, bytesRead); + + String readData = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8); + System.out.println("Read at offset " + currentPos + ": '" + readData + "'"); + assertEquals("Data mismatch", columnData[i], readData); + + currentPos = nextPos; + } + + inputStream.close(); + + System.out.println("SUCCESS: Small writes with getPos() tracking work correctly!\n"); + } + + @Test + public void testGetPosWithExactly78BytesBuffered() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with EXACTLY 78 bytes buffered (the bug size!) ==="); + + String testPath = TEST_ROOT + "/78-bytes-test.bin"; + + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Write some initial data + byte[] initial = new byte[1000]; + for (int i = 0; i < 1000; i++) { + initial[i] = (byte) i; + } + outputStream.write(initial); + outputStream.flush(); // Ensure this is flushed + + long posAfterFlush = outputStream.getPos(); + System.out.println("Position after 1000 bytes + flush: " + posAfterFlush); + assertEquals("Should be at position 1000 after flush", 1000, posAfterFlush); + + // Now write EXACTLY 78 bytes (the problematic buffer size in our bug) + byte[] problematicChunk = new byte[78]; + for (int i = 0; i < 78; i++) { + problematicChunk[i] = (byte) (i + 50); + } + outputStream.write(problematicChunk); + + // DO NOT FLUSH - this is the bug scenario! + // Parquet calls getPos() here while the 78 bytes are still buffered + long posWithBufferedData = outputStream.getPos(); + System.out.println("Position with 78 bytes BUFFERED (not flushed): " + posWithBufferedData); + + // This MUST return 1078, not 1000! + assertEquals("getPos() MUST include buffered data", 1078, posWithBufferedData); + + // Now close (which will flush) + outputStream.close(); + + // Verify actual file size + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + long actualFileSize = SeaweedRead.fileSize(entry); + + System.out.println("Actual file size: " + actualFileSize); + assertEquals("File size must be 1078", 1078, actualFileSize); + + // Try to read at position 1000 for 78 bytes (what Parquet would try) + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1000); + + byte[] readBuffer = new byte[78]; + int bytesRead = inputStream.read(readBuffer, 0, 78); + + System.out.println("Bytes read at position 1000: " + bytesRead); + assertEquals("Should successfully read 78 bytes at position 1000", 78, bytesRead); + + // Verify the data matches + for (int i = 0; i < 78; i++) { + assertEquals("Data mismatch at byte " + i, problematicChunk[i], readBuffer[i]); + } + + inputStream.close(); + + System.out.println("SUCCESS: getPos() correctly includes buffered data!\n"); + } +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java index f384e059f..3cfb2ce9e 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedStreamIntegrationTest.java @@ -28,22 +28,21 @@ public class SeaweedStreamIntegrationTest { private FilerClient filerClient; private static final String TEST_ROOT = "/test-stream-integration"; - private static final boolean TESTS_ENABLED = - "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); @Before public void setUp() throws Exception { if (!TESTS_ENABLED) { return; } - + filerClient = new FilerClient("localhost", 18888); - + // Clean up any existing test directory if (filerClient.exists(TEST_ROOT)) { filerClient.rm(TEST_ROOT, true, true); } - + // Create test root directory filerClient.mkdirs(TEST_ROOT, 0755); } @@ -53,7 +52,7 @@ public class SeaweedStreamIntegrationTest { if (!TESTS_ENABLED || filerClient == null) { return; } - + try { // Clean up test directory if (filerClient.exists(TEST_ROOT)) { @@ -70,30 +69,29 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/small.txt"; String testContent = "Hello, SeaweedFS!"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Verify file exists assertTrue("File should exist", filerClient.exists(testPath)); - + // Read file FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); assertNotNull("Entry should not be null", entry); - + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[testContent.length()]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read all bytes", testContent.length(), bytesRead); assertEquals("Content should match", testContent, new String(buffer, StandardCharsets.UTF_8)); } @@ -104,43 +102,42 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/large.bin"; int fileSize = 10 * 1024 * 1024; // 10 MB - + // Generate random data byte[] originalData = new byte[fileSize]; new Random(42).nextBytes(originalData); // Use seed for reproducibility - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(originalData); outputStream.close(); - + // Verify file exists assertTrue("File should exist", filerClient.exists(testPath)); - + // Read file FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); assertNotNull("Entry should not be null", entry); - + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); - + // Read file in chunks to handle large files properly byte[] readData = new byte[fileSize]; int totalRead = 0; int bytesRead; byte[] buffer = new byte[8192]; // Read in 8KB chunks - + while ((bytesRead = inputStream.read(buffer)) > 0) { System.arraycopy(buffer, 0, readData, totalRead, bytesRead); totalRead += bytesRead; } inputStream.close(); - + assertEquals("Should read all bytes", fileSize, totalRead); assertArrayEquals("Content should match", originalData, readData); } @@ -151,31 +148,30 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/chunked.txt"; - String[] chunks = {"First chunk. ", "Second chunk. ", "Third chunk."}; - + String[] chunks = { "First chunk. ", "Second chunk. ", "Third chunk." }; + // Write file in chunks SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); for (String chunk : chunks) { outputStream.write(chunk.getBytes(StandardCharsets.UTF_8)); } outputStream.close(); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[1024]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + String expected = String.join("", chunks); String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8); - + assertEquals("Content should match", expected, actual); } @@ -185,31 +181,30 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/offset.txt"; String testContent = "0123456789ABCDEFGHIJ"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read with offset FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); inputStream.seek(10); // Skip first 10 bytes - + byte[] buffer = new byte[10]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read 10 bytes", 10, bytesRead); - assertEquals("Should read from offset", "ABCDEFGHIJ", - new String(buffer, StandardCharsets.UTF_8)); + assertEquals("Should read from offset", "ABCDEFGHIJ", + new String(buffer, StandardCharsets.UTF_8)); } @Test @@ -218,32 +213,31 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/partial.txt"; String testContent = "The quick brown fox jumps over the lazy dog"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read partial FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); - + // Read only "quick brown" inputStream.seek(4); byte[] buffer = new byte[11]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read 11 bytes", 11, bytesRead); - assertEquals("Should read partial content", "quick brown", - new String(buffer, StandardCharsets.UTF_8)); + assertEquals("Should read partial content", "quick brown", + new String(buffer, StandardCharsets.UTF_8)); } @Test @@ -252,28 +246,27 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/empty.txt"; - + // Write empty file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.close(); - + // Verify file exists assertTrue("File should exist", filerClient.exists(testPath)); - + // Read empty file FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); assertNotNull("Entry should not be null", entry); - + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[100]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + assertEquals("Should read 0 bytes from empty file", -1, bytesRead); } @@ -283,32 +276,31 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/overwrite.txt"; String originalContent = "Original content"; String newContent = "New content that overwrites the original"; - + // Write original file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(originalContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Overwrite file outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(newContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[1024]; int bytesRead = inputStream.read(buffer); inputStream.close(); - + String actual = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8); assertEquals("Should have new content", newContent, actual); } @@ -319,23 +311,22 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/multireads.txt"; String testContent = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - + // Write file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.close(); - + // Read in multiple small chunks FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); - + StringBuilder result = new StringBuilder(); byte[] buffer = new byte[5]; int bytesRead; @@ -343,7 +334,7 @@ public class SeaweedStreamIntegrationTest { result.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); } inputStream.close(); - + assertEquals("Should read entire content", testContent, result.toString()); } @@ -353,29 +344,28 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/binary.bin"; byte[] binaryData = new byte[256]; for (int i = 0; i < 256; i++) { binaryData[i] = (byte) i; } - + // Write binary file SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(binaryData); outputStream.close(); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] readData = new byte[256]; int bytesRead = inputStream.read(readData); inputStream.close(); - + assertEquals("Should read all bytes", 256, bytesRead); assertArrayEquals("Binary data should match", binaryData, readData); } @@ -386,32 +376,132 @@ public class SeaweedStreamIntegrationTest { System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); return; } - + String testPath = TEST_ROOT + "/flush.txt"; String testContent = "Content to flush"; - + // Write file with flush SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); outputStream.write(testContent.getBytes(StandardCharsets.UTF_8)); outputStream.flush(); // Explicitly flush outputStream.close(); - + // Verify file was written assertTrue("File should exist after flush", filerClient.exists(testPath)); - + // Read and verify FilerProto.Entry entry = filerClient.lookupEntry( - SeaweedOutputStream.getParentDirectory(testPath), - SeaweedOutputStream.getFileName(testPath) - ); - + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); byte[] buffer = new byte[testContent.length()]; int bytesRead = inputStream.read(buffer); inputStream.close(); - - assertEquals("Content should match", testContent, - new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); + + assertEquals("Content should match", testContent, + new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); } -} + /** + * Tests range reads similar to how Parquet reads column chunks. + * This simulates: + * 1. Seeking to specific offsets + * 2. Reading specific byte ranges + * 3. Verifying each read() call returns the correct number of bytes + * + * This test specifically addresses the bug where read() was returning 0 + * for inline content or -1 prematurely for chunked reads. + */ + @Test + public void testRangeReads() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + String testPath = TEST_ROOT + "/rangereads.dat"; + + // Create a 1275-byte file (similar to the Parquet file size that was failing) + byte[] testData = new byte[1275]; + Random random = new Random(42); // Fixed seed for reproducibility + random.nextBytes(testData); + + // Write file + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + outputStream.write(testData); + outputStream.close(); + + // Read file entry + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + + // Test 1: Read last 8 bytes (like reading Parquet footer length) + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1267); + byte[] buffer = new byte[8]; + int bytesRead = inputStream.read(buffer, 0, 8); + assertEquals("Should read 8 bytes at offset 1267", 8, bytesRead); + assertArrayEquals("Content at offset 1267 should match", + Arrays.copyOfRange(testData, 1267, 1275), buffer); + inputStream.close(); + + // Test 2: Read large chunk in middle (like reading column data) + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(383); + buffer = new byte[884]; // Read bytes 383-1267 + bytesRead = inputStream.read(buffer, 0, 884); + assertEquals("Should read 884 bytes at offset 383", 884, bytesRead); + assertArrayEquals("Content at offset 383 should match", + Arrays.copyOfRange(testData, 383, 1267), buffer); + inputStream.close(); + + // Test 3: Read from beginning (like reading Parquet magic bytes) + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + buffer = new byte[4]; + bytesRead = inputStream.read(buffer, 0, 4); + assertEquals("Should read 4 bytes at offset 0", 4, bytesRead); + assertArrayEquals("Content at offset 0 should match", + Arrays.copyOfRange(testData, 0, 4), buffer); + inputStream.close(); + + // Test 4: Multiple sequential reads without seeking (like + // H2SeekableInputStream.readFully) + // This is the critical test case that was failing! + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1197); // Position where EOF was being returned prematurely + + byte[] fullBuffer = new byte[78]; // Try to read the "missing" 78 bytes + int totalRead = 0; + int offset = 0; + int remaining = 78; + + // Simulate Parquet's H2SeekableInputStream.readFully() loop + while (remaining > 0) { + int read = inputStream.read(fullBuffer, offset, remaining); + if (read == -1) { + fail(String.format( + "Got EOF after reading %d bytes, but expected to read %d more bytes (total requested: 78)", + totalRead, remaining)); + } + assertTrue("Each read() should return positive bytes", read > 0); + totalRead += read; + offset += read; + remaining -= read; + } + + assertEquals("Should read all 78 bytes in readFully loop", 78, totalRead); + assertArrayEquals("Content at offset 1197 should match", + Arrays.copyOfRange(testData, 1197, 1275), fullBuffer); + inputStream.close(); + + // Test 5: Read entire file in one go + inputStream = new SeaweedInputStream(filerClient, testPath, entry); + byte[] allData = new byte[1275]; + bytesRead = inputStream.read(allData, 0, 1275); + assertEquals("Should read entire 1275 bytes", 1275, bytesRead); + assertArrayEquals("Entire content should match", testData, allData); + inputStream.close(); + } +} |
