aboutsummaryrefslogtreecommitdiff
path: root/test/java/spark
diff options
context:
space:
mode:
Diffstat (limited to 'test/java/spark')
-rw-r--r--test/java/spark/.gitignore33
-rw-r--r--test/java/spark/Makefile75
-rw-r--r--test/java/spark/docker-compose.yml100
-rw-r--r--test/java/spark/pom.xml348
-rwxr-xr-xtest/java/spark/quick-start.sh149
-rwxr-xr-xtest/java/spark/run-tests.sh46
-rw-r--r--test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java138
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/GetPosBufferTest.java308
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/InputStreamComparisonTest.java393
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/OutputStreamComparisonTest.java466
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/ParquetOperationComparisonTest.java387
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/RenameChunkVerificationTest.java286
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SimpleOneColumnTest.java140
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SparkDataFrameWriteComparisonTest.java363
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SparkLocalFileSystemTest.java177
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SparkRawLocalFSTest.java132
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SparkReadDirectParquetTest.java194
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java239
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java278
-rw-r--r--test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java128
-rw-r--r--test/java/spark/src/test/resources/log4j.properties19
21 files changed, 4399 insertions, 0 deletions
diff --git a/test/java/spark/.gitignore b/test/java/spark/.gitignore
new file mode 100644
index 000000000..62341354a
--- /dev/null
+++ b/test/java/spark/.gitignore
@@ -0,0 +1,33 @@
+# Maven
+target/
+.m2/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+
+# IDE
+.idea/
+*.iml
+.vscode/
+.classpath
+.project
+.settings/
+
+# Spark
+spark-warehouse/
+metastore_db/
+derby.log
+
+# Logs
+*.log
+
+# OS
+.DS_Store
+Thumbs.db
+
+
diff --git a/test/java/spark/Makefile b/test/java/spark/Makefile
new file mode 100644
index 000000000..462447c66
--- /dev/null
+++ b/test/java/spark/Makefile
@@ -0,0 +1,75 @@
+.PHONY: help build test test-local test-docker clean run-example docker-up docker-down
+
+help:
+ @echo "SeaweedFS Spark Integration Tests"
+ @echo ""
+ @echo "Available targets:"
+ @echo " build - Build the project"
+ @echo " test - Run integration tests (requires SeaweedFS running)"
+ @echo " test-local - Run tests against local SeaweedFS"
+ @echo " test-docker - Run tests in Docker with SeaweedFS"
+ @echo " run-example - Run the example Spark application"
+ @echo " docker-up - Start SeaweedFS in Docker"
+ @echo " docker-down - Stop SeaweedFS Docker containers"
+ @echo " clean - Clean build artifacts"
+
+build:
+ mvn clean package
+
+test:
+ @if [ -z "$$SEAWEEDFS_TEST_ENABLED" ]; then \
+ echo "Setting SEAWEEDFS_TEST_ENABLED=true"; \
+ fi
+ SEAWEEDFS_TEST_ENABLED=true mvn test
+
+test-local:
+ @echo "Testing against local SeaweedFS (localhost:8888)..."
+ ./run-tests.sh
+
+test-docker:
+ @echo "Running tests in Docker..."
+ docker compose up --build --abort-on-container-exit spark-tests
+ docker compose down
+
+docker-up:
+ @echo "Starting SeaweedFS in Docker..."
+ docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer
+ @echo "Waiting for services to be ready..."
+ @sleep 5
+ @echo "SeaweedFS is ready!"
+ @echo " Master: http://localhost:9333"
+ @echo " Filer: http://localhost:8888"
+
+docker-down:
+ @echo "Stopping SeaweedFS Docker containers..."
+ docker compose down -v
+
+run-example:
+ @echo "Running example application..."
+ @if ! command -v spark-submit > /dev/null; then \
+ echo "Error: spark-submit not found. Please install Apache Spark."; \
+ exit 1; \
+ fi
+ spark-submit \
+ --class seaweed.spark.SparkSeaweedFSExample \
+ --master local[2] \
+ --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
+ --conf spark.hadoop.fs.seaweed.filer.host=localhost \
+ --conf spark.hadoop.fs.seaweed.filer.port=8888 \
+ --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
+ --conf spark.hadoop.fs.seaweed.replication="" \
+ target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
+ seaweedfs://localhost:8888/spark-example-output
+
+clean:
+ mvn clean
+ @echo "Build artifacts cleaned"
+
+verify-seaweedfs:
+ @echo "Verifying SeaweedFS connection..."
+ @curl -f http://localhost:8888/ > /dev/null 2>&1 && \
+ echo "✓ SeaweedFS filer is accessible" || \
+ (echo "✗ SeaweedFS filer is not accessible at http://localhost:8888"; exit 1)
+
+.DEFAULT_GOAL := help
+
diff --git a/test/java/spark/docker-compose.yml b/test/java/spark/docker-compose.yml
new file mode 100644
index 000000000..ed8757b88
--- /dev/null
+++ b/test/java/spark/docker-compose.yml
@@ -0,0 +1,100 @@
+services:
+ seaweedfs-master:
+ build:
+ context: ../../../docker
+ dockerfile: Dockerfile.local
+ image: seaweedfs:local
+ container_name: seaweedfs-spark-master
+ ports:
+ - "9333:9333"
+ - "19333:19333"
+ command: "master -ip=seaweedfs-master -ip.bind=0.0.0.0 -port=9333 -port.grpc=19333 -volumeSizeLimitMB=50 -defaultReplication=000 -peers=none"
+ networks:
+ - seaweedfs-spark
+ healthcheck:
+ test: ["CMD", "wget", "--spider", "-q", "http://localhost:9333/cluster/status"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ start_period: 10s
+
+ seaweedfs-volume:
+ build:
+ context: ../../../docker
+ dockerfile: Dockerfile.local
+ image: seaweedfs:local
+ container_name: seaweedfs-spark-volume
+ ports:
+ - "8080:8080"
+ - "18080:18080"
+ command: "volume -mserver=seaweedfs-master:9333 -ip=seaweedfs-volume -ip.bind=0.0.0.0 -port=8080 -port.grpc=18080 -publicUrl=seaweedfs-volume:8080 -max=100 -dir=/data -preStopSeconds=1"
+ volumes:
+ - seaweedfs-volume-data:/data
+ depends_on:
+ seaweedfs-master:
+ condition: service_healthy
+ networks:
+ - seaweedfs-spark
+ healthcheck:
+ test: ["CMD", "wget", "--spider", "-q", "http://localhost:8080/status"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ start_period: 10s
+
+ seaweedfs-filer:
+ build:
+ context: ../../../docker
+ dockerfile: Dockerfile.local
+ image: seaweedfs:local
+ container_name: seaweedfs-spark-filer
+ ports:
+ - "8888:8888"
+ - "18888:18888"
+ command: "filer -master=seaweedfs-master:9333 -ip=seaweedfs-filer -ip.bind=0.0.0.0 -port=8888 -port.grpc=18888"
+ depends_on:
+ seaweedfs-master:
+ condition: service_healthy
+ seaweedfs-volume:
+ condition: service_healthy
+ networks:
+ - seaweedfs-spark
+ healthcheck:
+ test: ["CMD", "wget", "--spider", "-q", "http://localhost:8888/"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ start_period: 15s
+
+ spark-tests:
+ image: maven:3.9-eclipse-temurin-17
+ container_name: seaweedfs-spark-tests
+ volumes:
+ - .:/workspace
+ - ./.m2:/root/.m2
+ working_dir: /workspace
+ environment:
+ - SEAWEEDFS_TEST_ENABLED=true
+ - SEAWEEDFS_FILER_HOST=seaweedfs-filer
+ - SEAWEEDFS_FILER_PORT=8888
+ - SEAWEEDFS_FILER_GRPC_PORT=18888
+ - HADOOP_HOME=/tmp
+ # Disable Java DNS caching to ensure fresh DNS lookups
+ - MAVEN_OPTS=-Dsun.net.inetaddr.ttl=0 -Dnetworkaddress.cache.ttl=0
+ - SPARK_SUBMIT_OPTS=-Dfs.seaweedfs.impl.disable.cache=true
+ command: sh -c "sleep 30 && mvn clean test"
+ depends_on:
+ seaweedfs-filer:
+ condition: service_healthy
+ networks:
+ - seaweedfs-spark
+ mem_limit: 4g
+ cpus: 2
+
+networks:
+ seaweedfs-spark:
+ driver: bridge
+
+volumes:
+ seaweedfs-volume-data:
+
diff --git a/test/java/spark/pom.xml b/test/java/spark/pom.xml
new file mode 100644
index 000000000..22228a856
--- /dev/null
+++ b/test/java/spark/pom.xml
@@ -0,0 +1,348 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.seaweedfs</groupId>
+ <artifactId>seaweedfs-spark-integration-tests</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>SeaweedFS Spark Integration Tests</name>
+ <description>Integration tests for Apache Spark with SeaweedFS HDFS client</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <spark.version>3.5.0</spark.version>
+ <hadoop.version>3.3.6</hadoop.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <junit.version>4.13.2</junit.version>
+ <seaweedfs.hadoop3.client.version>3.80.1-SNAPSHOT</seaweedfs.hadoop3.client.version>
+ <jackson.version>2.18.2</jackson.version> <!-- Upgraded from 2.15.3 -->
+ <netty.version>4.1.125.Final</netty.version> <!-- Upgraded to 4.1.125.Final for security fixes (CVE in netty-codec < 4.1.125.Final, netty-codec-http2 <= 4.1.123.Final) -->
+ <parquet.version>1.15.2</parquet.version> <!-- Upgraded to 1.15.2 for security fix -->
+ <parquet.format.version>2.12.0</parquet.format.version>
+ <surefire.jvm.args>
+ -Xmx2g
+ -Dhadoop.home.dir=/tmp
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens=java.base/java.io=ALL-UNNAMED
+ --add-opens=java.base/java.net=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+ --add-opens=java.base/sun.security.action=ALL-UNNAMED
+ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
+ --add-exports=java.base/sun.nio.ch=ALL-UNNAMED
+ </surefire.jvm.args>
+ </properties>
+
+ <!-- Override vulnerable transitive dependencies -->
+ <dependencyManagement>
+ <dependencies>
+ <!-- Jackson - Fix CVEs in older versions -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <!-- Netty - Fix CVEs in older versions -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+
+ <!-- Apache Avro - Fix CVEs -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.4</version>
+ </dependency>
+
+ <!-- Apache ZooKeeper - Fix CVEs -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.9.4</version>
+ </dependency>
+
+ <!-- Apache Commons - Fix CVEs -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.26.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.15.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <version>1.11.0</version>
+ </dependency>
+
+ <!-- Guava - Fix CVEs -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>32.1.3-jre</version>
+ </dependency>
+
+ <!-- SnakeYAML - Fix CVEs -->
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>2.2</version>
+ </dependency>
+
+ <!-- Protobuf - Fix CVEs -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.25.5</version>
+ </dependency>
+
+ <!-- Nimbus JOSE JWT - Fix CVEs (GHSA-xwmg-2g98-w7v9 and others) -->
+ <dependency>
+ <groupId>com.nimbusds</groupId>
+ <artifactId>nimbus-jose-jwt</artifactId>
+ <version>10.0.2</version>
+ </dependency>
+
+ <!-- Snappy Java - Fix CVEs -->
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.10.4</version>
+ </dependency>
+
+ <!-- DNS Java - Fix CVEs -->
+ <dependency>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ <version>3.6.0</version>
+ </dependency>
+
+ <!-- Apache Parquet - Upgrade to latest for bug fixes -->
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-common</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-encoding</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-format-structures</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-format</artifactId>
+ <version>${parquet.format.version}</version>
+ </dependency>
+
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <!-- Spark Core -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Spark SQL -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Hadoop Client -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- SeaweedFS Hadoop3 Client -->
+ <dependency>
+ <groupId>com.seaweedfs</groupId>
+ <artifactId>seaweedfs-hadoop3-client</artifactId>
+ <version>${seaweedfs.hadoop3.client.version}</version>
+ </dependency>
+
+ <!-- Testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.36</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ <version>1.7.36</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.11.0</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <skipTests>${skipTests}</skipTests>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ <argLine>${surefire.jvm.args}</argLine>
+ <systemPropertyVariables>
+ <log4j.configuration>file:${project.basedir}/src/test/resources/log4j.properties</log4j.configuration>
+ </systemPropertyVariables>
+ <environmentVariables>
+ <HADOOP_HOME>/tmp</HADOOP_HOME>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+
+ <!-- Shade plugin to create fat jar for Spark submit -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.5.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>seaweed.spark.SparkSeaweedFSExample</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
+
diff --git a/test/java/spark/quick-start.sh b/test/java/spark/quick-start.sh
new file mode 100755
index 000000000..974363311
--- /dev/null
+++ b/test/java/spark/quick-start.sh
@@ -0,0 +1,149 @@
+#!/bin/bash
+
+set -e
+
+echo "=== SeaweedFS Spark Integration Tests Quick Start ==="
+echo ""
+
+# Check if SeaweedFS is running
+check_seaweedfs() {
+ echo "Checking if SeaweedFS is running..."
+ if curl -f http://localhost:8888/ > /dev/null 2>&1; then
+ echo "✓ SeaweedFS filer is accessible at http://localhost:8888"
+ return 0
+ else
+ echo "✗ SeaweedFS filer is not accessible"
+ return 1
+ fi
+}
+
+# Start SeaweedFS with Docker if not running
+start_seaweedfs() {
+ echo ""
+ echo "Starting SeaweedFS with Docker..."
+ docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer
+
+ echo "Waiting for SeaweedFS to be ready..."
+ for i in {1..30}; do
+ if curl -f http://localhost:8888/ > /dev/null 2>&1; then
+ echo "✓ SeaweedFS is ready!"
+ return 0
+ fi
+ echo -n "."
+ sleep 2
+ done
+
+ echo ""
+ echo "✗ SeaweedFS failed to start"
+ return 1
+}
+
+# Build the project
+build_project() {
+ echo ""
+ echo "Building the project..."
+ mvn clean package -DskipTests
+ echo "✓ Build completed"
+}
+
+# Run tests
+run_tests() {
+ echo ""
+ echo "Running integration tests..."
+ export SEAWEEDFS_TEST_ENABLED=true
+ mvn test
+ echo "✓ Tests completed"
+}
+
+# Run example
+run_example() {
+ echo ""
+ echo "Running example application..."
+
+ if ! command -v spark-submit > /dev/null; then
+ echo "⚠ spark-submit not found. Skipping example application."
+ echo "To run the example, install Apache Spark and try: make run-example"
+ return 0
+ fi
+
+ spark-submit \
+ --class seaweed.spark.SparkSeaweedFSExample \
+ --master local[2] \
+ --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
+ --conf spark.hadoop.fs.seaweed.filer.host=localhost \
+ --conf spark.hadoop.fs.seaweed.filer.port=8888 \
+ --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
+ target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
+ seaweedfs://localhost:8888/spark-quickstart-output
+
+ echo "✓ Example completed"
+}
+
+# Cleanup
+cleanup() {
+ echo ""
+ echo "Cleaning up..."
+ docker-compose down -v
+ echo "✓ Cleanup completed"
+}
+
+# Main execution
+main() {
+ # Check if Docker is available
+ if ! command -v docker > /dev/null; then
+ echo "Error: Docker is not installed or not in PATH"
+ exit 1
+ fi
+
+ # Check if Maven is available
+ if ! command -v mvn > /dev/null; then
+ echo "Error: Maven is not installed or not in PATH"
+ exit 1
+ fi
+
+ # Check if SeaweedFS is running, if not start it
+ if ! check_seaweedfs; then
+ read -p "Do you want to start SeaweedFS with Docker? (y/n) " -n 1 -r
+ echo
+ if [[ $REPLY =~ ^[Yy]$ ]]; then
+ start_seaweedfs || exit 1
+ else
+ echo "Please start SeaweedFS manually and rerun this script."
+ exit 1
+ fi
+ fi
+
+ # Build project
+ build_project || exit 1
+
+ # Run tests
+ run_tests || exit 1
+
+ # Run example if Spark is available
+ run_example
+
+ echo ""
+ echo "=== Quick Start Completed Successfully! ==="
+ echo ""
+ echo "Next steps:"
+ echo " - View test results in target/surefire-reports/"
+ echo " - Check example output at http://localhost:8888/"
+ echo " - Run 'make help' for more options"
+ echo " - Read README.md for detailed documentation"
+ echo ""
+
+ read -p "Do you want to stop SeaweedFS? (y/n) " -n 1 -r
+ echo
+ if [[ $REPLY =~ ^[Yy]$ ]]; then
+ cleanup
+ fi
+}
+
+# Handle Ctrl+C
+trap cleanup INT
+
+# Run main
+main
+
+
+
diff --git a/test/java/spark/run-tests.sh b/test/java/spark/run-tests.sh
new file mode 100755
index 000000000..f637c8c59
--- /dev/null
+++ b/test/java/spark/run-tests.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+set -e
+
+echo "=== SeaweedFS Spark Integration Tests Runner ==="
+echo ""
+
+# Check if SeaweedFS is running
+check_seaweedfs() {
+ if curl -f http://localhost:8888/ > /dev/null 2>&1; then
+ echo "✓ SeaweedFS filer is accessible at http://localhost:8888"
+ return 0
+ else
+ echo "✗ SeaweedFS filer is not accessible"
+ return 1
+ fi
+}
+
+# Main
+if ! check_seaweedfs; then
+ echo ""
+ echo "Please start SeaweedFS first. You can use:"
+ echo " cd test/java/spark && docker-compose up -d"
+ echo "Or:"
+ echo " make docker-up"
+ exit 1
+fi
+
+echo ""
+echo "Running Spark integration tests..."
+echo ""
+
+export SEAWEEDFS_TEST_ENABLED=true
+export SEAWEEDFS_FILER_HOST=localhost
+export SEAWEEDFS_FILER_PORT=8888
+export SEAWEEDFS_FILER_GRPC_PORT=18888
+
+# Run tests
+mvn test "$@"
+
+echo ""
+echo "✓ Test run completed"
+echo "View detailed reports in: target/surefire-reports/"
+
+
+
diff --git a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java
new file mode 100644
index 000000000..75b2d710b
--- /dev/null
+++ b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java
@@ -0,0 +1,138 @@
+package seaweed.spark;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Example Spark application demonstrating SeaweedFS integration.
+ *
+ * This can be submitted to a Spark cluster using spark-submit.
+ *
+ * Example usage:
+ * spark-submit \
+ * --class seaweed.spark.SparkSeaweedFSExample \
+ * --master local[2] \
+ * --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
+ * --conf spark.hadoop.fs.seaweed.filer.host=localhost \
+ * --conf spark.hadoop.fs.seaweed.filer.port=8888 \
+ * --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
+ * target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
+ * seaweedfs://localhost:8888/output
+ */
+public class SparkSeaweedFSExample {
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.err.println("Usage: SparkSeaweedFSExample <output-path>");
+ System.err.println("Example: seaweedfs://localhost:8888/spark-output");
+ System.exit(1);
+ }
+
+ String outputPath = args[0];
+
+ // Create Spark session
+ SparkSession spark = SparkSession.builder()
+ .appName("SeaweedFS Spark Example")
+ .getOrCreate();
+
+ try {
+ System.out.println("=== SeaweedFS Spark Integration Example ===\n");
+
+ // Example 1: Generate data and write to SeaweedFS
+ System.out.println("1. Generating sample data...");
+ Dataset<Row> data = spark.range(0, 1000)
+ .selectExpr(
+ "id",
+ "id * 2 as doubled",
+ "CAST(rand() * 100 AS INT) as random_value");
+
+ System.out.println(" Generated " + data.count() + " rows");
+ data.show(5);
+
+ // Write as Parquet
+ String parquetPath = outputPath + "/data.parquet";
+ System.out.println("\n2. Writing data to SeaweedFS as Parquet...");
+ System.out.println(" Path: " + parquetPath);
+
+ data.write()
+ .mode(SaveMode.Overwrite)
+ .parquet(parquetPath);
+
+ System.out.println(" ✓ Write completed");
+
+ // Read back and verify
+ System.out.println("\n3. Reading data back from SeaweedFS...");
+ Dataset<Row> readData = spark.read().parquet(parquetPath);
+ System.out.println(" Read " + readData.count() + " rows");
+
+ // Perform aggregation
+ System.out.println("\n4. Performing aggregation...");
+ Dataset<Row> stats = readData.selectExpr(
+ "COUNT(*) as count",
+ "AVG(random_value) as avg_random",
+ "MAX(doubled) as max_doubled");
+
+ stats.show();
+
+ // Write aggregation results
+ String statsPath = outputPath + "/stats.parquet";
+ System.out.println("5. Writing stats to: " + statsPath);
+ stats.write()
+ .mode(SaveMode.Overwrite)
+ .parquet(statsPath);
+
+ // Create a partitioned dataset
+ System.out.println("\n6. Creating partitioned dataset...");
+ Dataset<Row> partitionedData = data.selectExpr(
+ "*",
+ "CAST(id % 10 AS INT) as partition_key");
+
+ String partitionedPath = outputPath + "/partitioned.parquet";
+ System.out.println(" Path: " + partitionedPath);
+
+ partitionedData.write()
+ .mode(SaveMode.Overwrite)
+ .partitionBy("partition_key")
+ .parquet(partitionedPath);
+
+ System.out.println(" ✓ Partitioned write completed");
+
+ // Read specific partition
+ System.out.println("\n7. Reading specific partition (partition_key=0)...");
+ Dataset<Row> partition0 = spark.read()
+ .parquet(partitionedPath)
+ .filter("partition_key = 0");
+
+ System.out.println(" Partition 0 contains " + partition0.count() + " rows");
+ partition0.show(5);
+
+ // SQL example
+ System.out.println("\n8. Using Spark SQL...");
+ readData.createOrReplaceTempView("seaweedfs_data");
+
+ Dataset<Row> sqlResult = spark.sql(
+ "SELECT " +
+ " CAST(id / 100 AS INT) as bucket, " +
+ " COUNT(*) as count, " +
+ " AVG(random_value) as avg_random " +
+ "FROM seaweedfs_data " +
+ "GROUP BY CAST(id / 100 AS INT) " +
+ "ORDER BY bucket");
+
+ System.out.println(" Bucketed statistics:");
+ sqlResult.show();
+
+ System.out.println("\n=== Example completed successfully! ===");
+ System.out.println("Output location: " + outputPath);
+
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ } finally {
+ spark.stop();
+ }
+ }
+}
diff --git a/test/java/spark/src/test/java/seaweed/spark/GetPosBufferTest.java b/test/java/spark/src/test/java/seaweed/spark/GetPosBufferTest.java
new file mode 100644
index 000000000..86dde66ab
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/GetPosBufferTest.java
@@ -0,0 +1,308 @@
+package seaweed.spark;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedInputStream;
+import seaweedfs.client.SeaweedOutputStream;
+import seaweedfs.client.SeaweedRead;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test to reproduce the Parquet EOF issue.
+ *
+ * The issue: When Parquet writes column chunks, it calls getPos() to record
+ * offsets.
+ * If getPos() returns a position that doesn't include buffered (unflushed)
+ * data,
+ * the footer metadata will have incorrect offsets.
+ *
+ * This test simulates Parquet's behavior:
+ * 1. Write some data (column chunk 1)
+ * 2. Call getPos() - Parquet records this as the END of chunk 1
+ * 3. Write more data (column chunk 2)
+ * 4. Call getPos() - Parquet records this as the END of chunk 2
+ * 5. Close the file
+ * 6. Verify that the recorded positions match the actual file content
+ *
+ * Prerequisites:
+ * - SeaweedFS master, volume server, and filer must be running
+ * - Default ports: filer HTTP 8888, filer gRPC 18888
+ *
+ * To run:
+ * export SEAWEEDFS_TEST_ENABLED=true
+ * cd other/java/client
+ * mvn test -Dtest=GetPosBufferTest
+ */
+public class GetPosBufferTest {
+
+ private FilerClient filerClient;
+ private static final String TEST_ROOT = "/test-getpos-buffer";
+ private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+
+ @Before
+ public void setUp() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+
+ String filerHost = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
+ String filerGrpcPort = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888");
+
+ filerClient = new FilerClient(filerHost, Integer.parseInt(filerGrpcPort));
+
+ // Clean up any existing test directory
+ if (filerClient.exists(TEST_ROOT)) {
+ filerClient.rm(TEST_ROOT, true, true);
+ }
+
+ // Create test root directory
+ filerClient.mkdirs(TEST_ROOT, 0755);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ if (filerClient != null) {
+ filerClient.rm(TEST_ROOT, true, true);
+ filerClient.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetPosWithBufferedData() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with buffered data ===");
+
+ String testPath = TEST_ROOT + "/getpos-test.bin";
+
+ // Simulate what Parquet does when writing column chunks
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write "column chunk 1" - 100 bytes
+ byte[] chunk1 = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ chunk1[i] = (byte) i;
+ }
+ outputStream.write(chunk1);
+
+ // Parquet calls getPos() here to record end of chunk 1
+ long posAfterChunk1 = outputStream.getPos();
+ System.out.println("Position after chunk 1 (100 bytes): " + posAfterChunk1);
+ assertEquals("getPos() should return 100 after writing 100 bytes", 100, posAfterChunk1);
+
+ // Write "column chunk 2" - 200 bytes
+ byte[] chunk2 = new byte[200];
+ for (int i = 0; i < 200; i++) {
+ chunk2[i] = (byte) (i + 100);
+ }
+ outputStream.write(chunk2);
+
+ // Parquet calls getPos() here to record end of chunk 2
+ long posAfterChunk2 = outputStream.getPos();
+ System.out.println("Position after chunk 2 (200 more bytes): " + posAfterChunk2);
+ assertEquals("getPos() should return 300 after writing 300 bytes total", 300, posAfterChunk2);
+
+ // Write "column chunk 3" - small chunk of 78 bytes (the problematic size!)
+ byte[] chunk3 = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ chunk3[i] = (byte) (i + 50);
+ }
+ outputStream.write(chunk3);
+
+ // Parquet calls getPos() here to record end of chunk 3
+ long posAfterChunk3 = outputStream.getPos();
+ System.out.println("Position after chunk 3 (78 more bytes): " + posAfterChunk3);
+ assertEquals("getPos() should return 378 after writing 378 bytes total", 378, posAfterChunk3);
+
+ // Close to flush everything
+ outputStream.close();
+ System.out.println("File closed successfully");
+
+ // Now read the file and verify its actual size matches what getPos() reported
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+
+ long actualFileSize = SeaweedRead.fileSize(entry);
+ System.out.println("Actual file size on disk: " + actualFileSize);
+
+ assertEquals("File size should match the last getPos() value", 378, actualFileSize);
+
+ // Now read the file and verify we can read all the data
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ byte[] readBuffer = new byte[500]; // Larger buffer to read everything
+ int totalRead = 0;
+ int bytesRead;
+ while ((bytesRead = inputStream.read(readBuffer, totalRead, readBuffer.length - totalRead)) > 0) {
+ totalRead += bytesRead;
+ }
+ inputStream.close();
+
+ System.out.println("Total bytes read: " + totalRead);
+ assertEquals("Should read exactly 378 bytes", 378, totalRead);
+
+ // Verify the data is correct
+ for (int i = 0; i < 100; i++) {
+ assertEquals("Chunk 1 data mismatch at byte " + i, (byte) i, readBuffer[i]);
+ }
+ for (int i = 0; i < 200; i++) {
+ assertEquals("Chunk 2 data mismatch at byte " + (100 + i), (byte) (i + 100), readBuffer[100 + i]);
+ }
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Chunk 3 data mismatch at byte " + (300 + i), (byte) (i + 50), readBuffer[300 + i]);
+ }
+
+ System.out.println("SUCCESS: All data verified correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithSmallWrites() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with many small writes (Parquet pattern) ===");
+
+ String testPath = TEST_ROOT + "/small-writes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Parquet writes column data in small chunks and frequently calls getPos()
+ String[] columnData = { "Alice", "Bob", "Charlie", "David" };
+ long[] recordedPositions = new long[columnData.length];
+
+ for (int i = 0; i < columnData.length; i++) {
+ byte[] data = columnData[i].getBytes(StandardCharsets.UTF_8);
+ outputStream.write(data);
+
+ // Parquet calls getPos() after each value to track offsets
+ recordedPositions[i] = outputStream.getPos();
+ System.out.println("After writing '" + columnData[i] + "': pos=" + recordedPositions[i]);
+ }
+
+ long finalPos = outputStream.getPos();
+ System.out.println("Final position before close: " + finalPos);
+
+ outputStream.close();
+
+ // Verify file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size should match final getPos()", finalPos, actualFileSize);
+
+ // Verify we can read using the recorded positions
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+
+ long currentPos = 0;
+ for (int i = 0; i < columnData.length; i++) {
+ long nextPos = recordedPositions[i];
+ int length = (int) (nextPos - currentPos);
+
+ byte[] buffer = new byte[length];
+ int bytesRead = inputStream.read(buffer, 0, length);
+
+ assertEquals("Should read " + length + " bytes for '" + columnData[i] + "'", length, bytesRead);
+
+ String readData = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
+ System.out.println("Read at offset " + currentPos + ": '" + readData + "'");
+ assertEquals("Data mismatch", columnData[i], readData);
+
+ currentPos = nextPos;
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: Small writes with getPos() tracking work correctly!\n");
+ }
+
+ @Test
+ public void testGetPosWithExactly78BytesBuffered() throws IOException {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n=== Testing getPos() with EXACTLY 78 bytes buffered (the bug size!) ===");
+
+ String testPath = TEST_ROOT + "/78-bytes-test.bin";
+
+ SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath);
+
+ // Write some initial data
+ byte[] initial = new byte[1000];
+ for (int i = 0; i < 1000; i++) {
+ initial[i] = (byte) i;
+ }
+ outputStream.write(initial);
+ outputStream.flush(); // Ensure this is flushed
+
+ long posAfterFlush = outputStream.getPos();
+ System.out.println("Position after 1000 bytes + flush: " + posAfterFlush);
+ assertEquals("Should be at position 1000 after flush", 1000, posAfterFlush);
+
+ // Now write EXACTLY 78 bytes (the problematic buffer size in our bug)
+ byte[] problematicChunk = new byte[78];
+ for (int i = 0; i < 78; i++) {
+ problematicChunk[i] = (byte) (i + 50);
+ }
+ outputStream.write(problematicChunk);
+
+ // DO NOT FLUSH - this is the bug scenario!
+ // Parquet calls getPos() here while the 78 bytes are still buffered
+ long posWithBufferedData = outputStream.getPos();
+ System.out.println("Position with 78 bytes BUFFERED (not flushed): " + posWithBufferedData);
+
+ // This MUST return 1078, not 1000!
+ assertEquals("getPos() MUST include buffered data", 1078, posWithBufferedData);
+
+ // Now close (which will flush)
+ outputStream.close();
+
+ // Verify actual file size
+ FilerProto.Entry entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(testPath),
+ SeaweedOutputStream.getFileName(testPath));
+ long actualFileSize = SeaweedRead.fileSize(entry);
+
+ System.out.println("Actual file size: " + actualFileSize);
+ assertEquals("File size must be 1078", 1078, actualFileSize);
+
+ // Try to read at position 1000 for 78 bytes (what Parquet would try)
+ SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry);
+ inputStream.seek(1000);
+
+ byte[] readBuffer = new byte[78];
+ int bytesRead = inputStream.read(readBuffer, 0, 78);
+
+ System.out.println("Bytes read at position 1000: " + bytesRead);
+ assertEquals("Should successfully read 78 bytes at position 1000", 78, bytesRead);
+
+ // Verify the data matches
+ for (int i = 0; i < 78; i++) {
+ assertEquals("Data mismatch at byte " + i, problematicChunk[i], readBuffer[i]);
+ }
+
+ inputStream.close();
+
+ System.out.println("SUCCESS: getPos() correctly includes buffered data!\n");
+ }
+}
diff --git a/test/java/spark/src/test/java/seaweed/spark/InputStreamComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/InputStreamComparisonTest.java
new file mode 100644
index 000000000..0cfe2a53b
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/InputStreamComparisonTest.java
@@ -0,0 +1,393 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Compare InputStream behavior between local disk and SeaweedFS
+ * to understand why Spark's ParquetFileReader fails with SeaweedFS.
+ */
+public class InputStreamComparisonTest extends SparkTestBase {
+
+ private static class ReadOperation {
+ String source;
+ String operation;
+ long position;
+ int requestedBytes;
+ int returnedBytes;
+ boolean isEOF;
+ long timestamp;
+
+ ReadOperation(String source, String operation, long position, int requestedBytes,
+ int returnedBytes, boolean isEOF) {
+ this.source = source;
+ this.operation = operation;
+ this.position = position;
+ this.requestedBytes = requestedBytes;
+ this.returnedBytes = returnedBytes;
+ this.isEOF = isEOF;
+ this.timestamp = System.nanoTime();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[%s] %s: pos=%d, requested=%d, returned=%d, EOF=%b",
+ source, operation, position, requestedBytes, returnedBytes, isEOF);
+ }
+ }
+
+ private static class LoggingInputStream extends InputStream {
+ private final FSDataInputStream wrapped;
+ private final String source;
+ private final List<ReadOperation> operations;
+ private long position = 0;
+
+ LoggingInputStream(FSDataInputStream wrapped, String source, List<ReadOperation> operations) {
+ this.wrapped = wrapped;
+ this.source = source;
+ this.operations = operations;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = wrapped.read();
+ operations.add(new ReadOperation(source, "read()", position, 1,
+ result == -1 ? 0 : 1, result == -1));
+ if (result != -1)
+ position++;
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int result = wrapped.read(b, off, len);
+ operations.add(new ReadOperation(source, "read(byte[])", position, len,
+ result == -1 ? 0 : result, result == -1));
+ if (result > 0)
+ position += result;
+ return result;
+ }
+
+ public int read(ByteBuffer buf) throws IOException {
+ int requested = buf.remaining();
+ long startPos = position;
+
+ // Use reflection to call read(ByteBuffer) if available
+ try {
+ java.lang.reflect.Method method = wrapped.getClass().getMethod("read", ByteBuffer.class);
+ int result = (int) method.invoke(wrapped, buf);
+ operations.add(new ReadOperation(source, "read(ByteBuffer)", startPos, requested,
+ result == -1 ? 0 : result, result == -1));
+ if (result > 0)
+ position += result;
+ return result;
+ } catch (Exception e) {
+ // Fallback to byte array read
+ byte[] temp = new byte[requested];
+ int result = wrapped.read(temp, 0, requested);
+ if (result > 0) {
+ buf.put(temp, 0, result);
+ }
+ operations.add(new ReadOperation(source, "read(ByteBuffer-fallback)", startPos, requested,
+ result == -1 ? 0 : result, result == -1));
+ if (result > 0)
+ position += result;
+ return result;
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long result = wrapped.skip(n);
+ operations.add(new ReadOperation(source, "skip()", position, (int) n, (int) result, false));
+ position += result;
+ return result;
+ }
+
+ @Override
+ public int available() throws IOException {
+ int result = wrapped.available();
+ operations.add(new ReadOperation(source, "available()", position, 0, result, false));
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ operations.add(new ReadOperation(source, "close()", position, 0, 0, false));
+ wrapped.close();
+ }
+
+ public void seek(long pos) throws IOException {
+ wrapped.seek(pos);
+ operations.add(new ReadOperation(source, "seek()", position, 0, 0, false));
+ position = pos;
+ }
+
+ public long getPos() throws IOException {
+ long pos = wrapped.getPos();
+ operations.add(new ReadOperation(source, "getPos()", position, 0, 0, false));
+ return pos;
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ super.setUpSpark();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ super.tearDownSpark();
+ }
+
+ @Test
+ public void testCompareInputStreamBehavior() throws Exception {
+ skipIfTestsDisabled();
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ REAL-TIME INPUTSTREAM COMPARISON: LOCAL vs SEAWEEDFS ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+
+ // Write a Parquet file to both locations
+ System.out.println("\n1. Writing identical Parquet files...");
+
+ List<SparkSQLTest.Employee> employees = java.util.Arrays.asList(
+ new SparkSQLTest.Employee(1, "Alice", "Engineering", 100000),
+ new SparkSQLTest.Employee(2, "Bob", "Sales", 80000),
+ new SparkSQLTest.Employee(3, "Charlie", "Engineering", 120000),
+ new SparkSQLTest.Employee(4, "David", "Sales", 75000));
+
+ org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df = spark.createDataFrame(employees,
+ SparkSQLTest.Employee.class);
+
+ String localPath = "file:///workspace/target/test-output/comparison-local";
+ String seaweedPath = getTestPath("comparison-seaweed");
+
+ // Ensure directory exists
+ new java.io.File("/workspace/target/test-output").mkdirs();
+
+ df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(localPath);
+ df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(seaweedPath);
+
+ System.out.println(" ✅ Files written");
+
+ // Find the actual parquet files
+ Configuration conf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT));
+ FileSystem seaweedFs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s",
+ SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf);
+
+ // Find parquet files
+ Path localFile = findParquetFile(localFs, new Path(localPath));
+ Path seaweedFile = findParquetFile(seaweedFs, new Path(seaweedPath));
+
+ assertNotNull("Local parquet file not found", localFile);
+ assertNotNull("SeaweedFS parquet file not found", seaweedFile);
+
+ System.out.println("\n2. Comparing file sizes...");
+ long localSize = localFs.getFileStatus(localFile).getLen();
+ long seaweedSize = seaweedFs.getFileStatus(seaweedFile).getLen();
+ System.out.println(" Local: " + localSize + " bytes");
+ System.out.println(" SeaweedFS: " + seaweedSize + " bytes");
+
+ // NOW: Open both streams with logging wrappers
+ List<ReadOperation> localOps = new ArrayList<>();
+ List<ReadOperation> seaweedOps = new ArrayList<>();
+
+ System.out.println("\n3. Opening streams with logging wrappers...");
+
+ FSDataInputStream localStream = localFs.open(localFile);
+ FSDataInputStream seaweedStream = seaweedFs.open(seaweedFile);
+
+ LoggingInputStream localLogging = new LoggingInputStream(localStream, "LOCAL", localOps);
+ LoggingInputStream seaweedLogging = new LoggingInputStream(seaweedStream, "SEAWEED", seaweedOps);
+
+ System.out.println(" ✅ Streams opened");
+
+ // Create a dual-reader that calls both and compares
+ System.out.println("\n4. Performing synchronized read operations...");
+ System.out.println(" (Each operation is called on BOTH streams and results are compared)\n");
+
+ int opCount = 0;
+ boolean mismatchFound = false;
+
+ // Operation 1: Read 4 bytes (magic bytes)
+ opCount++;
+ System.out.println(" Op " + opCount + ": read(4 bytes) - Reading magic bytes");
+ byte[] localBuf1 = new byte[4];
+ byte[] seaweedBuf1 = new byte[4];
+ int localRead1 = localLogging.read(localBuf1, 0, 4);
+ int seaweedRead1 = seaweedLogging.read(seaweedBuf1, 0, 4);
+ System.out.println(" LOCAL: returned " + localRead1 + " bytes: " + bytesToHex(localBuf1));
+ System.out.println(" SEAWEED: returned " + seaweedRead1 + " bytes: " + bytesToHex(seaweedBuf1));
+ if (localRead1 != seaweedRead1 || !java.util.Arrays.equals(localBuf1, seaweedBuf1)) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 2: Seek to end - 8 bytes (footer length + magic)
+ opCount++;
+ System.out.println("\n Op " + opCount + ": seek(fileSize - 8) - Jump to footer");
+ localLogging.seek(localSize - 8);
+ seaweedLogging.seek(seaweedSize - 8);
+ System.out.println(" LOCAL: seeked to " + localLogging.getPos());
+ System.out.println(" SEAWEED: seeked to " + seaweedLogging.getPos());
+ if (localLogging.getPos() != seaweedLogging.getPos()) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 3: Read 8 bytes (footer length + magic)
+ opCount++;
+ System.out.println("\n Op " + opCount + ": read(8 bytes) - Reading footer length + magic");
+ byte[] localBuf2 = new byte[8];
+ byte[] seaweedBuf2 = new byte[8];
+ int localRead2 = localLogging.read(localBuf2, 0, 8);
+ int seaweedRead2 = seaweedLogging.read(seaweedBuf2, 0, 8);
+ System.out.println(" LOCAL: returned " + localRead2 + " bytes: " + bytesToHex(localBuf2));
+ System.out.println(" SEAWEED: returned " + seaweedRead2 + " bytes: " + bytesToHex(seaweedBuf2));
+ if (localRead2 != seaweedRead2 || !java.util.Arrays.equals(localBuf2, seaweedBuf2)) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 4: Calculate footer offset and seek to it
+ int footerLength = java.nio.ByteBuffer.wrap(localBuf2, 0, 4).order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt();
+ long footerOffset = localSize - 8 - footerLength;
+
+ opCount++;
+ System.out.println("\n Op " + opCount + ": seek(" + footerOffset + ") - Jump to footer start");
+ System.out.println(" Footer length: " + footerLength + " bytes");
+ localLogging.seek(footerOffset);
+ seaweedLogging.seek(footerOffset);
+ System.out.println(" LOCAL: seeked to " + localLogging.getPos());
+ System.out.println(" SEAWEED: seeked to " + seaweedLogging.getPos());
+ if (localLogging.getPos() != seaweedLogging.getPos()) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 5: Read entire footer
+ opCount++;
+ System.out.println("\n Op " + opCount + ": read(" + footerLength + " bytes) - Reading footer metadata");
+ byte[] localFooter = new byte[footerLength];
+ byte[] seaweedFooter = new byte[footerLength];
+ int localRead3 = localLogging.read(localFooter, 0, footerLength);
+ int seaweedRead3 = seaweedLogging.read(seaweedFooter, 0, footerLength);
+ System.out.println(" LOCAL: returned " + localRead3 + " bytes");
+ System.out.println(" SEAWEED: returned " + seaweedRead3 + " bytes");
+ if (localRead3 != seaweedRead3 || !java.util.Arrays.equals(localFooter, seaweedFooter)) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ // Show first difference
+ for (int i = 0; i < Math.min(localRead3, seaweedRead3); i++) {
+ if (localFooter[i] != seaweedFooter[i]) {
+ System.out.println(" First difference at byte " + i + ": LOCAL=" +
+ String.format("0x%02X", localFooter[i]) + " SEAWEED=" +
+ String.format("0x%02X", seaweedFooter[i]));
+ break;
+ }
+ }
+ } else {
+ System.out.println(" ✅ Match - Footer metadata is IDENTICAL");
+ }
+
+ // Operation 6: Try reading past EOF
+ opCount++;
+ System.out.println("\n Op " + opCount + ": read(100 bytes) - Try reading past EOF");
+ byte[] localBuf3 = new byte[100];
+ byte[] seaweedBuf3 = new byte[100];
+ int localRead4 = localLogging.read(localBuf3, 0, 100);
+ int seaweedRead4 = seaweedLogging.read(seaweedBuf3, 0, 100);
+ System.out.println(" LOCAL: returned " + localRead4);
+ System.out.println(" SEAWEED: returned " + seaweedRead4);
+ if (localRead4 != seaweedRead4) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match - Both returned EOF");
+ }
+
+ localLogging.close();
+ seaweedLogging.close();
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ COMPARISON SUMMARY ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ System.out.println(" Total operations: " + opCount);
+ System.out.println(" LOCAL operations: " + localOps.size());
+ System.out.println(" SEAWEED operations: " + seaweedOps.size());
+
+ if (mismatchFound) {
+ System.out.println("\n ❌ MISMATCHES FOUND - Streams behave differently!");
+ } else {
+ System.out.println("\n ✅ ALL OPERATIONS MATCH - Streams are identical!");
+ }
+
+ System.out.println("\n Detailed operation log:");
+ System.out.println(" ----------------------");
+ for (int i = 0; i < Math.max(localOps.size(), seaweedOps.size()); i++) {
+ if (i < localOps.size()) {
+ System.out.println(" " + localOps.get(i));
+ }
+ if (i < seaweedOps.size()) {
+ System.out.println(" " + seaweedOps.get(i));
+ }
+ }
+
+ assertFalse("Streams should behave identically", mismatchFound);
+ }
+
+ private String bytesToHex(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ sb.append(String.format("%02X ", b));
+ }
+ return sb.toString().trim();
+ }
+
+ private Path findParquetFile(FileSystem fs, Path dir) throws IOException {
+ org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(dir);
+ for (org.apache.hadoop.fs.FileStatus file : files) {
+ if (file.getPath().getName().endsWith(".parquet") &&
+ !file.getPath().getName().startsWith("_")) {
+ return file.getPath();
+ }
+ }
+ return null;
+ }
+}
diff --git a/test/java/spark/src/test/java/seaweed/spark/OutputStreamComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/OutputStreamComparisonTest.java
new file mode 100644
index 000000000..487cafc69
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/OutputStreamComparisonTest.java
@@ -0,0 +1,466 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Compare OutputStream behavior between local disk and SeaweedFS
+ * to understand why Parquet files written to SeaweedFS have incorrect metadata.
+ */
+public class OutputStreamComparisonTest extends SparkTestBase {
+
+ private static class WriteOperation {
+ String source;
+ String operation;
+ long positionBefore;
+ long positionAfter;
+ int bytesWritten;
+ long timestamp;
+ String details;
+
+ WriteOperation(String source, String operation, long positionBefore, long positionAfter,
+ int bytesWritten, String details) {
+ this.source = source;
+ this.operation = operation;
+ this.positionBefore = positionBefore;
+ this.positionAfter = positionAfter;
+ this.bytesWritten = bytesWritten;
+ this.timestamp = System.nanoTime();
+ this.details = details;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[%s] %s: posBefore=%d, posAfter=%d, written=%d %s",
+ source, operation, positionBefore, positionAfter, bytesWritten,
+ details != null ? "(" + details + ")" : "");
+ }
+ }
+
+ private static class LoggingOutputStream extends OutputStream {
+ private final FSDataOutputStream wrapped;
+ private final String source;
+ private final List<WriteOperation> operations;
+
+ LoggingOutputStream(FSDataOutputStream wrapped, String source, List<WriteOperation> operations) {
+ this.wrapped = wrapped;
+ this.source = source;
+ this.operations = operations;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ long posBefore = wrapped.getPos();
+ wrapped.write(b);
+ long posAfter = wrapped.getPos();
+ operations.add(new WriteOperation(source, "write(int)", posBefore, posAfter, 1, null));
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ long posBefore = wrapped.getPos();
+ wrapped.write(b, off, len);
+ long posAfter = wrapped.getPos();
+ operations.add(new WriteOperation(source, "write(byte[])", posBefore, posAfter, len,
+ "len=" + len));
+ }
+
+ @Override
+ public void flush() throws IOException {
+ long posBefore = wrapped.getPos();
+ wrapped.flush();
+ long posAfter = wrapped.getPos();
+ operations.add(new WriteOperation(source, "flush()", posBefore, posAfter, 0, null));
+ }
+
+ @Override
+ public void close() throws IOException {
+ long posBefore = wrapped.getPos();
+ wrapped.close();
+ long posAfter = 0; // Can't call getPos() after close
+ operations.add(new WriteOperation(source, "close()", posBefore, posAfter, 0,
+ "finalPos=" + posBefore));
+ }
+
+ public long getPos() throws IOException {
+ long pos = wrapped.getPos();
+ operations.add(new WriteOperation(source, "getPos()", pos, pos, 0, "returned=" + pos));
+ return pos;
+ }
+
+ public void hflush() throws IOException {
+ long posBefore = wrapped.getPos();
+ wrapped.hflush();
+ long posAfter = wrapped.getPos();
+ operations.add(new WriteOperation(source, "hflush()", posBefore, posAfter, 0, null));
+ }
+
+ public void hsync() throws IOException {
+ long posBefore = wrapped.getPos();
+ wrapped.hsync();
+ long posAfter = wrapped.getPos();
+ operations.add(new WriteOperation(source, "hsync()", posBefore, posAfter, 0, null));
+ }
+ }
+
+ private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(
+ "message schema {"
+ + "required int32 id;"
+ + "required binary name;"
+ + "required int32 age;"
+ + "}"
+ );
+
+ @Before
+ public void setUp() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ super.setUpSpark();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ super.tearDownSpark();
+ }
+
+ @Test
+ public void testCompareOutputStreamBehavior() throws Exception {
+ skipIfTestsDisabled();
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ REAL-TIME OUTPUTSTREAM COMPARISON: LOCAL vs SEAWEEDFS ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+
+ // Prepare file systems
+ Configuration conf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT));
+ FileSystem seaweedFs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s",
+ SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf);
+
+ // Prepare paths
+ new java.io.File("/workspace/target/test-output").mkdirs();
+ Path localPath = new Path("file:///workspace/target/test-output/write-comparison-local.parquet");
+ Path seaweedPath = new Path(getTestPath("write-comparison-seaweed.parquet"));
+
+ // Delete if exists
+ localFs.delete(localPath, false);
+ seaweedFs.delete(seaweedPath, false);
+
+ List<WriteOperation> localOps = new ArrayList<>();
+ List<WriteOperation> seaweedOps = new ArrayList<>();
+
+ System.out.println("\n1. Writing Parquet files with synchronized operations...\n");
+
+ // Write using ParquetWriter with custom OutputStreams
+ GroupWriteSupport.setSchema(SCHEMA, conf);
+
+ // Create data
+ SimpleGroupFactory groupFactory = new SimpleGroupFactory(SCHEMA);
+ List<Group> groups = new ArrayList<>();
+ groups.add(groupFactory.newGroup().append("id", 1).append("name", "Alice").append("age", 30));
+ groups.add(groupFactory.newGroup().append("id", 2).append("name", "Bob").append("age", 25));
+ groups.add(groupFactory.newGroup().append("id", 3).append("name", "Charlie").append("age", 35));
+
+ // Write to local disk
+ System.out.println(" Writing to LOCAL DISK...");
+ try (ParquetWriter<Group> localWriter = new ParquetWriter<>(
+ localPath,
+ new GroupWriteSupport(),
+ CompressionCodecName.SNAPPY,
+ 1024 * 1024, // Block size
+ 1024, // Page size
+ 1024, // Dictionary page size
+ true, // Enable dictionary
+ false, // Don't validate
+ ParquetWriter.DEFAULT_WRITER_VERSION,
+ conf)) {
+ for (Group group : groups) {
+ localWriter.write(group);
+ }
+ }
+ System.out.println(" ✅ Local write complete");
+
+ // Write to SeaweedFS
+ System.out.println("\n Writing to SEAWEEDFS...");
+ try (ParquetWriter<Group> seaweedWriter = new ParquetWriter<>(
+ seaweedPath,
+ new GroupWriteSupport(),
+ CompressionCodecName.SNAPPY,
+ 1024 * 1024, // Block size
+ 1024, // Page size
+ 1024, // Dictionary page size
+ true, // Enable dictionary
+ false, // Don't validate
+ ParquetWriter.DEFAULT_WRITER_VERSION,
+ conf)) {
+ for (Group group : groups) {
+ seaweedWriter.write(group);
+ }
+ }
+ System.out.println(" ✅ SeaweedFS write complete");
+
+ // Compare file sizes
+ System.out.println("\n2. Comparing final file sizes...");
+ long localSize = localFs.getFileStatus(localPath).getLen();
+ long seaweedSize = seaweedFs.getFileStatus(seaweedPath).getLen();
+ System.out.println(" LOCAL: " + localSize + " bytes");
+ System.out.println(" SEAWEED: " + seaweedSize + " bytes");
+
+ if (localSize == seaweedSize) {
+ System.out.println(" ✅ File sizes MATCH");
+ } else {
+ System.out.println(" ❌ File sizes DIFFER by " + Math.abs(localSize - seaweedSize) + " bytes");
+ }
+
+ // Now test reading both files
+ System.out.println("\n3. Testing if both files can be read by Spark...");
+
+ System.out.println("\n Reading LOCAL file:");
+ try {
+ org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> localDf =
+ spark.read().parquet(localPath.toString());
+ long localCount = localDf.count();
+ System.out.println(" ✅ LOCAL read SUCCESS - " + localCount + " rows");
+ localDf.show();
+ } catch (Exception e) {
+ System.out.println(" ❌ LOCAL read FAILED: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ System.out.println("\n Reading SEAWEEDFS file:");
+ try {
+ org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> seaweedDf =
+ spark.read().parquet(seaweedPath.toString());
+ long seaweedCount = seaweedDf.count();
+ System.out.println(" ✅ SEAWEEDFS read SUCCESS - " + seaweedCount + " rows");
+ seaweedDf.show();
+ } catch (Exception e) {
+ System.out.println(" ❌ SEAWEEDFS read FAILED: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ COMPARISON COMPLETE ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ }
+
+ @Test
+ public void testCompareRawOutputStreamOperations() throws Exception {
+ skipIfTestsDisabled();
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ RAW OUTPUTSTREAM COMPARISON: LOCAL vs SEAWEEDFS ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+
+ // Prepare file systems
+ Configuration conf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT));
+ FileSystem seaweedFs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s",
+ SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf);
+
+ // Prepare paths
+ new java.io.File("/workspace/target/test-output").mkdirs();
+ Path localPath = new Path("file:///workspace/target/test-output/raw-comparison-local.dat");
+ Path seaweedPath = new Path(getTestPath("raw-comparison-seaweed.dat"));
+
+ // Delete if exists
+ localFs.delete(localPath, false);
+ seaweedFs.delete(seaweedPath, false);
+
+ List<WriteOperation> localOps = new ArrayList<>();
+ List<WriteOperation> seaweedOps = new ArrayList<>();
+
+ System.out.println("\n1. Performing synchronized write operations...\n");
+
+ // Open both streams
+ FSDataOutputStream localStream = localFs.create(localPath, true);
+ FSDataOutputStream seaweedStream = seaweedFs.create(seaweedPath, true);
+
+ LoggingOutputStream localLogging = new LoggingOutputStream(localStream, "LOCAL", localOps);
+ LoggingOutputStream seaweedLogging = new LoggingOutputStream(seaweedStream, "SEAWEED", seaweedOps);
+
+ int opCount = 0;
+ boolean mismatchFound = false;
+
+ // Operation 1: Write 4 bytes (magic)
+ opCount++;
+ System.out.println(" Op " + opCount + ": write(4 bytes) - Writing magic bytes");
+ byte[] magic = "PAR1".getBytes();
+ localLogging.write(magic, 0, 4);
+ seaweedLogging.write(magic, 0, 4);
+ long localPos1 = localLogging.getPos();
+ long seaweedPos1 = seaweedLogging.getPos();
+ System.out.println(" LOCAL: getPos() = " + localPos1);
+ System.out.println(" SEAWEED: getPos() = " + seaweedPos1);
+ if (localPos1 != seaweedPos1) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 2: Write 100 bytes of data
+ opCount++;
+ System.out.println("\n Op " + opCount + ": write(100 bytes) - Writing data");
+ byte[] data = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ data[i] = (byte) i;
+ }
+ localLogging.write(data, 0, 100);
+ seaweedLogging.write(data, 0, 100);
+ long localPos2 = localLogging.getPos();
+ long seaweedPos2 = seaweedLogging.getPos();
+ System.out.println(" LOCAL: getPos() = " + localPos2);
+ System.out.println(" SEAWEED: getPos() = " + seaweedPos2);
+ if (localPos2 != seaweedPos2) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 3: Flush
+ opCount++;
+ System.out.println("\n Op " + opCount + ": flush()");
+ localLogging.flush();
+ seaweedLogging.flush();
+ long localPos3 = localLogging.getPos();
+ long seaweedPos3 = seaweedLogging.getPos();
+ System.out.println(" LOCAL: getPos() after flush = " + localPos3);
+ System.out.println(" SEAWEED: getPos() after flush = " + seaweedPos3);
+ if (localPos3 != seaweedPos3) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 4: Write more data
+ opCount++;
+ System.out.println("\n Op " + opCount + ": write(50 bytes) - Writing more data");
+ byte[] moreData = new byte[50];
+ for (int i = 0; i < 50; i++) {
+ moreData[i] = (byte) (i + 100);
+ }
+ localLogging.write(moreData, 0, 50);
+ seaweedLogging.write(moreData, 0, 50);
+ long localPos4 = localLogging.getPos();
+ long seaweedPos4 = seaweedLogging.getPos();
+ System.out.println(" LOCAL: getPos() = " + localPos4);
+ System.out.println(" SEAWEED: getPos() = " + seaweedPos4);
+ if (localPos4 != seaweedPos4) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 5: Write final bytes (simulating footer)
+ opCount++;
+ System.out.println("\n Op " + opCount + ": write(8 bytes) - Writing footer");
+ byte[] footer = new byte[]{0x6B, 0x03, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31};
+ localLogging.write(footer, 0, 8);
+ seaweedLogging.write(footer, 0, 8);
+ long localPos5 = localLogging.getPos();
+ long seaweedPos5 = seaweedLogging.getPos();
+ System.out.println(" LOCAL: getPos() = " + localPos5);
+ System.out.println(" SEAWEED: getPos() = " + seaweedPos5);
+ if (localPos5 != seaweedPos5) {
+ System.out.println(" ❌ MISMATCH!");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ Match");
+ }
+
+ // Operation 6: Close
+ opCount++;
+ System.out.println("\n Op " + opCount + ": close()");
+ System.out.println(" LOCAL: closing at position " + localPos5);
+ System.out.println(" SEAWEED: closing at position " + seaweedPos5);
+ localLogging.close();
+ seaweedLogging.close();
+
+ // Check final file sizes
+ System.out.println("\n2. Comparing final file sizes...");
+ long localSize = localFs.getFileStatus(localPath).getLen();
+ long seaweedSize = seaweedFs.getFileStatus(seaweedPath).getLen();
+ System.out.println(" LOCAL: " + localSize + " bytes");
+ System.out.println(" SEAWEED: " + seaweedSize + " bytes");
+
+ if (localSize != seaweedSize) {
+ System.out.println(" ❌ File sizes DIFFER by " + Math.abs(localSize - seaweedSize) + " bytes");
+ mismatchFound = true;
+ } else {
+ System.out.println(" ✅ File sizes MATCH");
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ COMPARISON SUMMARY ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ System.out.println(" Total operations: " + opCount);
+ System.out.println(" LOCAL operations: " + localOps.size());
+ System.out.println(" SEAWEED operations: " + seaweedOps.size());
+
+ if (mismatchFound) {
+ System.out.println("\n ❌ MISMATCHES FOUND - Streams behave differently!");
+ } else {
+ System.out.println("\n ✅ ALL OPERATIONS MATCH - Streams are identical!");
+ }
+
+ System.out.println("\n Detailed operation log:");
+ System.out.println(" ----------------------");
+ int maxOps = Math.max(localOps.size(), seaweedOps.size());
+ for (int i = 0; i < maxOps; i++) {
+ if (i < localOps.size()) {
+ System.out.println(" " + localOps.get(i));
+ }
+ if (i < seaweedOps.size()) {
+ System.out.println(" " + seaweedOps.get(i));
+ }
+ if (i < localOps.size() && i < seaweedOps.size()) {
+ WriteOperation localOp = localOps.get(i);
+ WriteOperation seaweedOp = seaweedOps.get(i);
+ if (localOp.positionAfter != seaweedOp.positionAfter) {
+ System.out.println(" ⚠️ Position mismatch: LOCAL=" + localOp.positionAfter +
+ " SEAWEED=" + seaweedOp.positionAfter);
+ }
+ }
+ }
+
+ assertFalse("Streams should behave identically", mismatchFound);
+ }
+}
+
diff --git a/test/java/spark/src/test/java/seaweed/spark/ParquetOperationComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/ParquetOperationComparisonTest.java
new file mode 100644
index 000000000..5636618ec
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/ParquetOperationComparisonTest.java
@@ -0,0 +1,387 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Detailed comparison of InputStream/OutputStream operations between
+ * local filesystem and SeaweedFS during Parquet file writing.
+ *
+ * This test intercepts and logs every read/write/getPos operation to
+ * identify exactly where the behavior diverges.
+ */
+public class ParquetOperationComparisonTest extends SparkTestBase {
+
+ private static final String SCHEMA_STRING = "message Employee { " +
+ " required int32 id; " +
+ " required binary name (UTF8); " +
+ " required int32 age; " +
+ "}";
+
+ private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(SCHEMA_STRING);
+
+ // Track all operations for comparison
+ private static class OperationLog {
+ List<String> operations = new ArrayList<>();
+
+ void log(String op) {
+ operations.add(op);
+ System.out.println(" " + op);
+ }
+
+ void print(String title) {
+ System.out.println("\n" + title + " (" + operations.size() + " operations):");
+ for (int i = 0; i < operations.size(); i++) {
+ System.out.printf(" [%3d] %s\n", i, operations.get(i));
+ }
+ }
+
+ void compare(OperationLog other, String name1, String name2) {
+ System.out.println("\n=== COMPARISON: " + name1 + " vs " + name2 + " ===");
+
+ int maxLen = Math.max(operations.size(), other.operations.size());
+ int differences = 0;
+
+ for (int i = 0; i < maxLen; i++) {
+ String op1 = i < operations.size() ? operations.get(i) : "<missing>";
+ String op2 = i < other.operations.size() ? other.operations.get(i) : "<missing>";
+
+ if (!op1.equals(op2)) {
+ differences++;
+ System.out.printf("[%3d] DIFF:\n", i);
+ System.out.println(" " + name1 + ": " + op1);
+ System.out.println(" " + name2 + ": " + op2);
+ }
+ }
+
+ if (differences == 0) {
+ System.out.println("✅ Operations are IDENTICAL!");
+ } else {
+ System.out.println("❌ Found " + differences + " differences");
+ }
+ }
+ }
+
+ // Wrapper for FSDataOutputStream that logs all operations
+ private static class LoggingOutputStream extends FSDataOutputStream {
+ private final FSDataOutputStream delegate;
+ private final OperationLog log;
+ private final String name;
+
+ public LoggingOutputStream(FSDataOutputStream delegate, OperationLog log, String name) throws IOException {
+ super(delegate.getWrappedStream(), null);
+ this.delegate = delegate;
+ this.log = log;
+ this.name = name;
+ log.log(name + " CREATED");
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ log.log(String.format("write(byte) pos=%d", getPos()));
+ delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ long posBefore = getPos();
+ delegate.write(b, off, len);
+ long posAfter = getPos();
+ log.log(String.format("write(%d bytes) pos %d→%d", len, posBefore, posAfter));
+ }
+
+ @Override
+ public long getPos() {
+ long pos = delegate.getPos();
+ // Don't log getPos itself to avoid infinite recursion, but track it
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ log.log(String.format("flush() pos=%d", getPos()));
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ log.log(String.format("close() pos=%d", getPos()));
+ delegate.close();
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ log.log(String.format("hflush() pos=%d", getPos()));
+ delegate.hflush();
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ log.log(String.format("hsync() pos=%d", getPos()));
+ delegate.hsync();
+ }
+ }
+
+ // Wrapper for FSDataInputStream that logs all operations
+ private static class LoggingInputStream extends FSDataInputStream {
+ private final OperationLog log;
+ private final String name;
+
+ public LoggingInputStream(FSDataInputStream delegate, OperationLog log, String name) throws IOException {
+ super(delegate);
+ this.log = log;
+ this.name = name;
+ log.log(name + " CREATED");
+ }
+
+ @Override
+ public int read() throws IOException {
+ long posBefore = getPos();
+ int result = super.read();
+ log.log(String.format("read() pos %d→%d result=%d", posBefore, getPos(), result));
+ return result;
+ }
+
+ // Can't override read(byte[], int, int) as it's final in DataInputStream
+ // The logging will happen through read(ByteBuffer) which is what Parquet uses
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ long posBefore = getPos();
+ int result = super.read(buf);
+ log.log(String.format("read(ByteBuffer %d) pos %d→%d result=%d", buf.remaining(), posBefore, getPos(),
+ result));
+ return result;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ long posBefore = getPos();
+ super.seek(pos);
+ log.log(String.format("seek(%d) pos %d→%d", pos, posBefore, getPos()));
+ }
+
+ @Override
+ public void close() throws IOException {
+ log.log(String.format("close() pos=%d", getPos()));
+ super.close();
+ }
+ }
+
+ @Test
+ public void testCompareWriteOperations() throws Exception {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ PARQUET WRITE OPERATION COMPARISON TEST ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝\n");
+
+ // Setup filesystems
+ Configuration localConf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(localConf);
+
+ Configuration seaweedConf = new Configuration();
+ seaweedConf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ seaweedConf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ seaweedConf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT);
+ FileSystem seaweedFs = FileSystem.get(
+ java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT),
+ seaweedConf);
+
+ Path localPath = new Path("/tmp/test-local-ops-" + System.currentTimeMillis() + ".parquet");
+ Path seaweedPath = new Path("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT +
+ "/test-spark/ops-test.parquet");
+
+ OperationLog localLog = new OperationLog();
+ OperationLog seaweedLog = new OperationLog();
+
+ // Write to local filesystem with logging
+ System.out.println("=== Writing to LOCAL filesystem ===");
+ writeParquetWithLogging(localFs, localPath, localConf, localLog, "LOCAL");
+
+ System.out.println("\n=== Writing to SEAWEEDFS ===");
+ writeParquetWithLogging(seaweedFs, seaweedPath, seaweedConf, seaweedLog, "SEAWEED");
+
+ // Print logs
+ localLog.print("LOCAL OPERATIONS");
+ seaweedLog.print("SEAWEEDFS OPERATIONS");
+
+ // Compare
+ localLog.compare(seaweedLog, "LOCAL", "SEAWEEDFS");
+
+ // Cleanup
+ localFs.delete(localPath, false);
+ seaweedFs.delete(seaweedPath, false);
+
+ localFs.close();
+ seaweedFs.close();
+
+ System.out.println("\n=== Test Complete ===");
+ }
+
+ @Test
+ public void testCompareReadOperations() throws Exception {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ PARQUET READ OPERATION COMPARISON TEST ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝\n");
+
+ // Setup filesystems
+ Configuration localConf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(localConf);
+
+ Configuration seaweedConf = new Configuration();
+ seaweedConf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ seaweedConf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ seaweedConf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT);
+ FileSystem seaweedFs = FileSystem.get(
+ java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT),
+ seaweedConf);
+
+ Path localPath = new Path("/tmp/test-local-read-" + System.currentTimeMillis() + ".parquet");
+ Path seaweedPath = new Path("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT +
+ "/test-spark/read-test.parquet");
+
+ // First write files without logging
+ System.out.println("=== Writing test files ===");
+ writeParquetSimple(localFs, localPath, localConf);
+ writeParquetSimple(seaweedFs, seaweedPath, seaweedConf);
+ System.out.println("✅ Files written");
+
+ OperationLog localLog = new OperationLog();
+ OperationLog seaweedLog = new OperationLog();
+
+ // Read from local filesystem with logging
+ System.out.println("\n=== Reading from LOCAL filesystem ===");
+ readParquetWithLogging(localFs, localPath, localLog, "LOCAL");
+
+ System.out.println("\n=== Reading from SEAWEEDFS ===");
+ readParquetWithLogging(seaweedFs, seaweedPath, seaweedLog, "SEAWEED");
+
+ // Print logs
+ localLog.print("LOCAL READ OPERATIONS");
+ seaweedLog.print("SEAWEEDFS READ OPERATIONS");
+
+ // Compare
+ localLog.compare(seaweedLog, "LOCAL", "SEAWEEDFS");
+
+ // Cleanup
+ localFs.delete(localPath, false);
+ seaweedFs.delete(seaweedPath, false);
+
+ localFs.close();
+ seaweedFs.close();
+
+ System.out.println("\n=== Test Complete ===");
+ }
+
+ private void writeParquetWithLogging(FileSystem fs, Path path, Configuration conf,
+ OperationLog log, String name) throws IOException {
+ // We can't easily intercept ParquetWriter's internal stream usage,
+ // but we can log the file operations
+ log.log(name + " START WRITE");
+
+ GroupWriteSupport.setSchema(SCHEMA, conf);
+
+ try (ParquetWriter<Group> writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path)
+ .withConf(conf)
+ .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
+ .build()) {
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA);
+
+ log.log("WRITE ROW 1");
+ Group group1 = factory.newGroup()
+ .append("id", 1)
+ .append("name", "Alice")
+ .append("age", 30);
+ writer.write(group1);
+
+ log.log("WRITE ROW 2");
+ Group group2 = factory.newGroup()
+ .append("id", 2)
+ .append("name", "Bob")
+ .append("age", 25);
+ writer.write(group2);
+
+ log.log("WRITE ROW 3");
+ Group group3 = factory.newGroup()
+ .append("id", 3)
+ .append("name", "Charlie")
+ .append("age", 35);
+ writer.write(group3);
+
+ log.log("CLOSE WRITER");
+ }
+
+ // Check final file size
+ org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(path);
+ log.log(String.format("FINAL FILE SIZE: %d bytes", status.getLen()));
+ }
+
+ private void writeParquetSimple(FileSystem fs, Path path, Configuration conf) throws IOException {
+ GroupWriteSupport.setSchema(SCHEMA, conf);
+
+ try (ParquetWriter<Group> writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path)
+ .withConf(conf)
+ .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
+ .build()) {
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA);
+
+ writer.write(factory.newGroup().append("id", 1).append("name", "Alice").append("age", 30));
+ writer.write(factory.newGroup().append("id", 2).append("name", "Bob").append("age", 25));
+ writer.write(factory.newGroup().append("id", 3).append("name", "Charlie").append("age", 35));
+ }
+ }
+
+ private void readParquetWithLogging(FileSystem fs, Path path, OperationLog log, String name) throws IOException {
+ log.log(name + " START READ");
+
+ // Read file in chunks to see the pattern
+ try (FSDataInputStream in = fs.open(path)) {
+ byte[] buffer = new byte[256];
+ int totalRead = 0;
+ int chunkNum = 0;
+
+ while (true) {
+ long posBefore = in.getPos();
+ int bytesRead = in.read(buffer);
+
+ if (bytesRead == -1) {
+ log.log(String.format("READ CHUNK %d: EOF at pos=%d", chunkNum, posBefore));
+ break;
+ }
+
+ totalRead += bytesRead;
+ log.log(String.format("READ CHUNK %d: %d bytes at pos %d→%d",
+ chunkNum, bytesRead, posBefore, in.getPos()));
+ chunkNum++;
+ }
+
+ log.log(String.format("TOTAL READ: %d bytes in %d chunks", totalRead, chunkNum));
+ }
+ }
+}
diff --git a/test/java/spark/src/test/java/seaweed/spark/RenameChunkVerificationTest.java b/test/java/spark/src/test/java/seaweed/spark/RenameChunkVerificationTest.java
new file mode 100644
index 000000000..0002c26b1
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/RenameChunkVerificationTest.java
@@ -0,0 +1,286 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test to verify if file chunks are preserved during rename operations.
+ * This could explain why Parquet files become unreadable after Spark's commit.
+ */
+public class RenameChunkVerificationTest extends SparkTestBase {
+
+ @Before
+ public void setUp() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ super.setUpSpark();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ super.tearDownSpark();
+ }
+
+ @Test
+ public void testSparkWriteAndRenamePreservesChunks() throws Exception {
+ skipIfTestsDisabled();
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ TESTING: Chunk Preservation During Spark Write & Rename ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+
+ // Write using Spark (which uses rename for commit)
+ List<SparkSQLTest.Employee> employees = Arrays.asList(
+ new SparkSQLTest.Employee(1, "Alice", "Engineering", 100000),
+ new SparkSQLTest.Employee(2, "Bob", "Sales", 80000),
+ new SparkSQLTest.Employee(3, "Charlie", "Engineering", 120000),
+ new SparkSQLTest.Employee(4, "David", "Sales", 75000));
+
+ org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df =
+ spark.createDataFrame(employees, SparkSQLTest.Employee.class);
+
+ String tablePath = getTestPath("chunk-test");
+
+ System.out.println("\n1. Writing Parquet file using Spark...");
+ df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(tablePath);
+ System.out.println(" ✅ Write complete");
+
+ // Get file system
+ Configuration conf = new Configuration();
+ conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT));
+ FileSystem fs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s",
+ SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf);
+
+ // Find the parquet file
+ Path parquetFile = null;
+ org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(new Path(tablePath));
+ for (org.apache.hadoop.fs.FileStatus file : files) {
+ if (file.getPath().getName().endsWith(".parquet") &&
+ !file.getPath().getName().startsWith("_")) {
+ parquetFile = file.getPath();
+ break;
+ }
+ }
+
+ assertNotNull("Parquet file not found", parquetFile);
+
+ System.out.println("\n2. Checking file metadata after Spark write...");
+ org.apache.hadoop.fs.FileStatus fileStatus = fs.getFileStatus(parquetFile);
+ long fileSize = fileStatus.getLen();
+ System.out.println(" File: " + parquetFile.getName());
+ System.out.println(" Size: " + fileSize + " bytes");
+
+ // Try to read the file
+ System.out.println("\n3. Attempting to read file with Spark...");
+ try {
+ org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readDf =
+ spark.read().parquet(tablePath);
+ long count = readDf.count();
+ System.out.println(" ✅ Read SUCCESS - " + count + " rows");
+ readDf.show();
+ } catch (Exception e) {
+ System.out.println(" ❌ Read FAILED: " + e.getMessage());
+ System.out.println("\n Error details:");
+ e.printStackTrace();
+
+ // This is expected to fail - let's investigate why
+ System.out.println("\n4. Investigating chunk availability...");
+
+ // Try to read the raw bytes
+ System.out.println("\n Attempting to read raw bytes...");
+ try (org.apache.hadoop.fs.FSDataInputStream in = fs.open(parquetFile)) {
+ byte[] header = new byte[4];
+ int read = in.read(header);
+ System.out.println(" Read " + read + " bytes");
+ System.out.println(" Header: " + bytesToHex(header));
+
+ if (read == 4 && Arrays.equals(header, "PAR1".getBytes())) {
+ System.out.println(" ✅ Magic bytes are correct (PAR1)");
+ } else {
+ System.out.println(" ❌ Magic bytes are WRONG!");
+ }
+
+ // Try to read footer
+ in.seek(fileSize - 8);
+ byte[] footer = new byte[8];
+ read = in.read(footer);
+ System.out.println("\n Footer (last 8 bytes): " + bytesToHex(footer));
+
+ // Try to read entire file
+ in.seek(0);
+ byte[] allBytes = new byte[(int)fileSize];
+ int totalRead = 0;
+ while (totalRead < fileSize) {
+ int bytesRead = in.read(allBytes, totalRead, (int)(fileSize - totalRead));
+ if (bytesRead == -1) {
+ System.out.println(" ❌ Premature EOF at byte " + totalRead + " (expected " + fileSize + ")");
+ break;
+ }
+ totalRead += bytesRead;
+ }
+
+ if (totalRead == fileSize) {
+ System.out.println(" ✅ Successfully read all " + totalRead + " bytes");
+ } else {
+ System.out.println(" ❌ Only read " + totalRead + " of " + fileSize + " bytes");
+ }
+
+ } catch (Exception readEx) {
+ System.out.println(" ❌ Raw read failed: " + readEx.getMessage());
+ readEx.printStackTrace();
+ }
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ TEST COMPLETE ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ }
+
+ @Test
+ public void testManualRenamePreservesChunks() throws Exception {
+ skipIfTestsDisabled();
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ TESTING: Manual Rename Chunk Preservation ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+
+ // Get file system
+ Configuration conf = new Configuration();
+ conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT));
+ FileSystem fs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s",
+ SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf);
+
+ Path sourcePath = new Path(getTestPath("rename-source.dat"));
+ Path destPath = new Path(getTestPath("rename-dest.dat"));
+
+ // Clean up
+ fs.delete(sourcePath, false);
+ fs.delete(destPath, false);
+
+ System.out.println("\n1. Creating test file...");
+ byte[] testData = new byte[1260];
+ for (int i = 0; i < testData.length; i++) {
+ testData[i] = (byte)(i % 256);
+ }
+
+ try (org.apache.hadoop.fs.FSDataOutputStream out = fs.create(sourcePath, true)) {
+ out.write(testData);
+ }
+ System.out.println(" ✅ Created source file: " + sourcePath);
+
+ // Check source file
+ System.out.println("\n2. Verifying source file...");
+ org.apache.hadoop.fs.FileStatus sourceStatus = fs.getFileStatus(sourcePath);
+ System.out.println(" Size: " + sourceStatus.getLen() + " bytes");
+
+ // Read source file
+ try (org.apache.hadoop.fs.FSDataInputStream in = fs.open(sourcePath)) {
+ byte[] readData = new byte[1260];
+ int totalRead = 0;
+ while (totalRead < 1260) {
+ int bytesRead = in.read(readData, totalRead, 1260 - totalRead);
+ if (bytesRead == -1) break;
+ totalRead += bytesRead;
+ }
+ System.out.println(" Read: " + totalRead + " bytes");
+
+ if (Arrays.equals(testData, readData)) {
+ System.out.println(" ✅ Source file data is correct");
+ } else {
+ System.out.println(" ❌ Source file data is CORRUPTED");
+ }
+ }
+
+ // Perform rename
+ System.out.println("\n3. Renaming file...");
+ boolean renamed = fs.rename(sourcePath, destPath);
+ System.out.println(" Rename result: " + renamed);
+
+ if (!renamed) {
+ System.out.println(" ❌ Rename FAILED");
+ return;
+ }
+
+ // Check destination file
+ System.out.println("\n4. Verifying destination file...");
+ org.apache.hadoop.fs.FileStatus destStatus = fs.getFileStatus(destPath);
+ System.out.println(" Size: " + destStatus.getLen() + " bytes");
+
+ if (destStatus.getLen() != sourceStatus.getLen()) {
+ System.out.println(" ❌ File size CHANGED during rename!");
+ System.out.println(" Source: " + sourceStatus.getLen());
+ System.out.println(" Dest: " + destStatus.getLen());
+ } else {
+ System.out.println(" ✅ File size preserved");
+ }
+
+ // Read destination file
+ try (org.apache.hadoop.fs.FSDataInputStream in = fs.open(destPath)) {
+ byte[] readData = new byte[1260];
+ int totalRead = 0;
+ while (totalRead < 1260) {
+ int bytesRead = in.read(readData, totalRead, 1260 - totalRead);
+ if (bytesRead == -1) {
+ System.out.println(" ❌ Premature EOF at byte " + totalRead);
+ break;
+ }
+ totalRead += bytesRead;
+ }
+ System.out.println(" Read: " + totalRead + " bytes");
+
+ if (totalRead == 1260 && Arrays.equals(testData, readData)) {
+ System.out.println(" ✅ Destination file data is CORRECT");
+ } else {
+ System.out.println(" ❌ Destination file data is CORRUPTED or INCOMPLETE");
+
+ // Show first difference
+ for (int i = 0; i < Math.min(totalRead, 1260); i++) {
+ if (testData[i] != readData[i]) {
+ System.out.println(" First difference at byte " + i);
+ System.out.println(" Expected: " + String.format("0x%02X", testData[i]));
+ System.out.println(" Got: " + String.format("0x%02X", readData[i]));
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ System.out.println(" ❌ Read FAILED: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ // Clean up
+ fs.delete(destPath, false);
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ TEST COMPLETE ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ }
+
+ private String bytesToHex(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ sb.append(String.format("%02X ", b));
+ }
+ return sb.toString().trim();
+ }
+}
+
diff --git a/test/java/spark/src/test/java/seaweed/spark/SimpleOneColumnTest.java b/test/java/spark/src/test/java/seaweed/spark/SimpleOneColumnTest.java
new file mode 100644
index 000000000..092039042
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SimpleOneColumnTest.java
@@ -0,0 +1,140 @@
+package seaweed.spark;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Simplified test with only one column to isolate the EOF issue.
+ */
+public class SimpleOneColumnTest extends SparkTestBase {
+
+ @Test
+ public void testSingleIntegerColumn() {
+ skipIfTestsDisabled();
+
+ // Clean up any previous test data
+ String tablePath = getTestPath("simple_data");
+ try {
+ spark.read().parquet(tablePath);
+ // If we get here, path exists, so delete it
+ org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(
+ new java.net.URI(tablePath),
+ spark.sparkContext().hadoopConfiguration());
+ fs.delete(new org.apache.hadoop.fs.Path(tablePath), true);
+ } catch (Exception e) {
+ // Path doesn't exist, which is fine
+ }
+
+ // Create simple data with just one integer column
+ List<SimpleData> data = Arrays.asList(
+ new SimpleData(1),
+ new SimpleData(2),
+ new SimpleData(3),
+ new SimpleData(4));
+
+ Dataset<Row> df = spark.createDataFrame(data, SimpleData.class);
+
+ // Write to SeaweedFS
+ df.write().mode(SaveMode.Overwrite).parquet(tablePath);
+
+ // Read back
+ Dataset<Row> readDf = spark.read().parquet(tablePath);
+
+ // Simple count
+ assertEquals(4, readDf.count());
+
+ // Create view and query
+ readDf.createOrReplaceTempView("simple");
+
+ // Simple WHERE query
+ Dataset<Row> filtered = spark.sql("SELECT value FROM simple WHERE value > 2");
+ assertEquals(2, filtered.count());
+
+ // Verify values
+ List<Row> results = filtered.collectAsList();
+ assertTrue(results.stream().anyMatch(r -> r.getInt(0) == 3));
+ assertTrue(results.stream().anyMatch(r -> r.getInt(0) == 4));
+ }
+
+ @Test
+ public void testSingleStringColumn() {
+ skipIfTestsDisabled();
+
+ // Create simple data with just one string column
+ List<StringData> data = Arrays.asList(
+ new StringData("Alice"),
+ new StringData("Bob"),
+ new StringData("Charlie"),
+ new StringData("David"));
+
+ Dataset<Row> df = spark.createDataFrame(data, StringData.class);
+
+ // Write to SeaweedFS
+ String tablePath = getTestPath("string_data");
+ df.write().mode(SaveMode.Overwrite).parquet(tablePath);
+
+ // Read back
+ Dataset<Row> readDf = spark.read().parquet(tablePath);
+
+ // Simple count
+ assertEquals(4, readDf.count());
+
+ // Create view and query
+ readDf.createOrReplaceTempView("strings");
+
+ // Simple WHERE query
+ Dataset<Row> filtered = spark.sql("SELECT name FROM strings WHERE name LIKE 'A%'");
+ assertEquals(1, filtered.count());
+
+ // Verify value
+ List<Row> results = filtered.collectAsList();
+ assertEquals("Alice", results.get(0).getString(0));
+ }
+
+ // Test data classes
+ public static class SimpleData implements java.io.Serializable {
+ private int value;
+
+ public SimpleData() {
+ }
+
+ public SimpleData(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int value) {
+ this.value = value;
+ }
+ }
+
+ public static class StringData implements java.io.Serializable {
+ private String name;
+
+ public StringData() {
+ }
+
+ public StringData(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+}
+
diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkDataFrameWriteComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkDataFrameWriteComparisonTest.java
new file mode 100644
index 000000000..d3fc1555c
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SparkDataFrameWriteComparisonTest.java
@@ -0,0 +1,363 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Compare Spark DataFrame.write().parquet() operations between
+ * local filesystem and SeaweedFS to identify the exact difference
+ * that causes the 78-byte EOF error.
+ */
+public class SparkDataFrameWriteComparisonTest extends SparkTestBase {
+
+ private static class OperationLog {
+ List<String> operations = new ArrayList<>();
+
+ synchronized void log(String op) {
+ operations.add(op);
+ System.out.println(" " + op);
+ }
+
+ void print(String title) {
+ System.out.println("\n" + title + " (" + operations.size() + " operations):");
+ for (int i = 0; i < operations.size(); i++) {
+ System.out.printf(" [%3d] %s\n", i, operations.get(i));
+ }
+ }
+
+ void compare(OperationLog other, String name1, String name2) {
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ COMPARISON: " + name1 + " vs " + name2);
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+
+ int maxLen = Math.max(operations.size(), other.operations.size());
+ int differences = 0;
+
+ for (int i = 0; i < maxLen; i++) {
+ String op1 = i < operations.size() ? operations.get(i) : "<missing>";
+ String op2 = i < other.operations.size() ? other.operations.get(i) : "<missing>";
+
+ // Normalize operation strings for comparison (remove file-specific parts)
+ String normalized1 = normalizeOp(op1);
+ String normalized2 = normalizeOp(op2);
+
+ if (!normalized1.equals(normalized2)) {
+ differences++;
+ System.out.printf("\n[%3d] DIFFERENCE:\n", i);
+ System.out.println(" " + name1 + ": " + op1);
+ System.out.println(" " + name2 + ": " + op2);
+ }
+ }
+
+ System.out.println("\n" + "=".repeat(64));
+ if (differences == 0) {
+ System.out.println("✅ Operations are IDENTICAL!");
+ } else {
+ System.out.println("❌ Found " + differences + " differences");
+ }
+ System.out.println("=".repeat(64));
+ }
+
+ private String normalizeOp(String op) {
+ // Remove file-specific identifiers for comparison
+ return op.replaceAll("part-[0-9a-f-]+", "part-XXXXX")
+ .replaceAll("attempt_[0-9]+", "attempt_XXXXX")
+ .replaceAll("/tmp/[^/]+", "/tmp/XXXXX")
+ .replaceAll("test-local-[0-9]+", "test-local-XXXXX");
+ }
+ }
+
+ // Custom FileSystem wrapper that logs all operations
+ private static class LoggingFileSystem extends FilterFileSystem {
+ private final OperationLog log;
+ private final String name;
+
+ public LoggingFileSystem(FileSystem fs, OperationLog log, String name) {
+ this.fs = fs;
+ this.log = log;
+ this.name = name;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ log.log(String.format("%s CREATE: %s (bufferSize=%d)", name, f.getName(), bufferSize));
+ FSDataOutputStream out = fs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+ return new LoggingOutputStream(out, log, name, f.getName());
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ log.log(String.format("%s APPEND: %s (bufferSize=%d)", name, f.getName(), bufferSize));
+ FSDataOutputStream out = fs.append(f, bufferSize, progress);
+ return new LoggingOutputStream(out, log, name, f.getName());
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ log.log(String.format("%s RENAME: %s → %s", name, src.getName(), dst.getName()));
+ return fs.rename(src, dst);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ log.log(String.format("%s DELETE: %s (recursive=%s)", name, f.getName(), recursive));
+ return fs.delete(f, recursive);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ FileStatus[] result = fs.listStatus(f);
+ log.log(String.format("%s LISTSTATUS: %s (%d files)", name, f.getName(), result.length));
+ return result;
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+ fs.setWorkingDirectory(new_dir);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return fs.getWorkingDirectory();
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ log.log(String.format("%s MKDIRS: %s", name, f.getName()));
+ return fs.mkdirs(f, permission);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ FileStatus status = fs.getFileStatus(f);
+ log.log(String.format("%s GETFILESTATUS: %s (size=%d)", name, f.getName(), status.getLen()));
+ return status;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ log.log(String.format("%s OPEN: %s (bufferSize=%d)", name, f.getName(), bufferSize));
+ return fs.open(f, bufferSize);
+ }
+ }
+
+ private static class LoggingOutputStream extends FSDataOutputStream {
+ private final FSDataOutputStream delegate;
+ private final OperationLog log;
+ private final String name;
+ private final String filename;
+ private long writeCount = 0;
+
+ public LoggingOutputStream(FSDataOutputStream delegate, OperationLog log, String name, String filename) throws IOException {
+ super(delegate.getWrappedStream(), null);
+ this.delegate = delegate;
+ this.log = log;
+ this.name = name;
+ this.filename = filename;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ writeCount++;
+ long posBefore = getPos();
+ delegate.write(b, off, len);
+ long posAfter = getPos();
+
+ // Log significant writes and the last few writes (potential footer)
+ if (len >= 100 || writeCount <= 5 || (writeCount % 100 == 0)) {
+ log.log(String.format("%s WRITE #%d: %d bytes, pos %d→%d [%s]",
+ name, writeCount, len, posBefore, posAfter, filename));
+ }
+ }
+
+ @Override
+ public long getPos() {
+ long pos = delegate.getPos();
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ log.log(String.format("%s FLUSH: pos=%d [%s]", name, getPos(), filename));
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ log.log(String.format("%s CLOSE: pos=%d, totalWrites=%d [%s]",
+ name, getPos(), writeCount, filename));
+ delegate.close();
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ log.log(String.format("%s HFLUSH: pos=%d [%s]", name, getPos(), filename));
+ delegate.hflush();
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ log.log(String.format("%s HSYNC: pos=%d [%s]", name, getPos(), filename));
+ delegate.hsync();
+ }
+ }
+
+ @Test
+ public void testCompareSparkDataFrameWrite() throws Exception {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ SPARK DATAFRAME.WRITE() OPERATION COMPARISON TEST ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝\n");
+
+ // Create test data (4 rows - this is what causes the error)
+ List<Employee> employees = Arrays.asList(
+ new Employee(1, "Alice", "Engineering", 100000),
+ new Employee(2, "Bob", "Sales", 80000),
+ new Employee(3, "Charlie", "Engineering", 120000),
+ new Employee(4, "David", "Sales", 75000)
+ );
+
+ Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
+
+ OperationLog localLog = new OperationLog();
+ OperationLog seaweedLog = new OperationLog();
+
+ // Test 1: Write to local filesystem with logging
+ System.out.println("=== Writing to LOCAL filesystem with Spark ===");
+ String localPath = "/tmp/spark-local-test-" + System.currentTimeMillis();
+
+ try {
+ // Configure Spark to use our logging filesystem for local writes
+ Configuration localConf = new Configuration();
+ FileSystem localFs = FileSystem.getLocal(localConf);
+ LoggingFileSystem loggingLocalFs = new LoggingFileSystem(localFs, localLog, "LOCAL");
+
+ // Write using Spark
+ df.write().mode(SaveMode.Overwrite).parquet("file://" + localPath);
+
+ System.out.println("✅ Local write completed");
+
+ // Check final file
+ FileStatus[] files = localFs.listStatus(new Path(localPath));
+ for (FileStatus file : files) {
+ if (file.getPath().getName().endsWith(".parquet")) {
+ localLog.log(String.format("LOCAL FINAL FILE: %s (%d bytes)",
+ file.getPath().getName(), file.getLen()));
+ }
+ }
+
+ } catch (Exception e) {
+ System.out.println("❌ Local write failed: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ // Test 2: Write to SeaweedFS with logging
+ System.out.println("\n=== Writing to SEAWEEDFS with Spark ===");
+ String seaweedPath = getTestPath("spark-seaweed-test");
+
+ try {
+ df.write().mode(SaveMode.Overwrite).parquet(seaweedPath);
+
+ System.out.println("✅ SeaweedFS write completed");
+
+ // Check final file
+ Configuration seaweedConf = new Configuration();
+ seaweedConf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ seaweedConf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ seaweedConf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT);
+ FileSystem seaweedFs = FileSystem.get(
+ java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT),
+ seaweedConf);
+
+ FileStatus[] files = seaweedFs.listStatus(new Path(seaweedPath));
+ for (FileStatus file : files) {
+ if (file.getPath().getName().endsWith(".parquet")) {
+ seaweedLog.log(String.format("SEAWEED FINAL FILE: %s (%d bytes)",
+ file.getPath().getName(), file.getLen()));
+ }
+ }
+
+ } catch (Exception e) {
+ System.out.println("❌ SeaweedFS write failed: " + e.getMessage());
+ if (e.getMessage() != null && e.getMessage().contains("bytes left")) {
+ System.out.println("🎯 This is the 78-byte EOF error during WRITE!");
+ }
+ e.printStackTrace();
+ }
+
+ // Test 3: Try reading both
+ System.out.println("\n=== Reading LOCAL file ===");
+ try {
+ Dataset<Row> localDf = spark.read().parquet("file://" + localPath);
+ long count = localDf.count();
+ System.out.println("✅ Local read successful: " + count + " rows");
+ } catch (Exception e) {
+ System.out.println("❌ Local read failed: " + e.getMessage());
+ }
+
+ System.out.println("\n=== Reading SEAWEEDFS file ===");
+ try {
+ Dataset<Row> seaweedDf = spark.read().parquet(seaweedPath);
+ long count = seaweedDf.count();
+ System.out.println("✅ SeaweedFS read successful: " + count + " rows");
+ } catch (Exception e) {
+ System.out.println("❌ SeaweedFS read failed: " + e.getMessage());
+ if (e.getMessage() != null && e.getMessage().contains("bytes left")) {
+ System.out.println("🎯 This is the 78-byte EOF error during READ!");
+ }
+ }
+
+ // Print operation logs
+ localLog.print("LOCAL OPERATIONS");
+ seaweedLog.print("SEAWEEDFS OPERATIONS");
+
+ // Compare
+ localLog.compare(seaweedLog, "LOCAL", "SEAWEEDFS");
+
+ System.out.println("\n=== Test Complete ===");
+ }
+
+ // Employee class for test data
+ public static class Employee implements java.io.Serializable {
+ private int id;
+ private String name;
+ private String department;
+ private int salary;
+
+ public Employee() {}
+
+ public Employee(int id, String name, String department, int salary) {
+ this.id = id;
+ this.name = name;
+ this.department = department;
+ this.salary = salary;
+ }
+
+ public int getId() { return id; }
+ public void setId(int id) { this.id = id; }
+ public String getName() { return name; }
+ public void setName(String name) { this.name = name; }
+ public String getDepartment() { return department; }
+ public void setDepartment(String department) { this.department = department; }
+ public int getSalary() { return salary; }
+ public void setSalary(int salary) { this.salary = salary; }
+ }
+}
+
diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkLocalFileSystemTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkLocalFileSystemTest.java
new file mode 100644
index 000000000..1d1881563
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SparkLocalFileSystemTest.java
@@ -0,0 +1,177 @@
+package seaweed.spark;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test Spark DataFrame.write() with LOCAL filesystem to see if the issue is SeaweedFS-specific.
+ * This is the CRITICAL test to determine if the 78-byte error occurs with local files.
+ */
+public class SparkLocalFileSystemTest extends SparkTestBase {
+
+ private String localTestDir;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUpSpark();
+ localTestDir = "/tmp/spark-local-test-" + System.currentTimeMillis();
+ new File(localTestDir).mkdirs();
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ CRITICAL TEST: Spark DataFrame.write() to LOCAL filesystem ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ System.out.println("Local test directory: " + localTestDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Clean up
+ if (localTestDir != null) {
+ deleteDirectory(new File(localTestDir));
+ }
+ super.tearDownSpark();
+ }
+
+ @Test
+ public void testSparkWriteToLocalFilesystem() {
+ System.out.println("\n=== TEST: Write Parquet to Local Filesystem ===");
+
+ // Create test data (same as SparkSQLTest)
+ List<Employee> employees = Arrays.asList(
+ new Employee(1, "Alice", "Engineering", 100000),
+ new Employee(2, "Bob", "Sales", 80000),
+ new Employee(3, "Charlie", "Engineering", 120000),
+ new Employee(4, "David", "Sales", 75000));
+
+ Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
+
+ // Write to LOCAL filesystem using file:// protocol
+ String localPath = "file://" + localTestDir + "/employees";
+ System.out.println("Writing to: " + localPath);
+
+ try {
+ df.write().mode(SaveMode.Overwrite).parquet(localPath);
+ System.out.println("✅ Write completed successfully!");
+ } catch (Exception e) {
+ System.err.println("❌ Write FAILED: " + e.getMessage());
+ e.printStackTrace();
+ fail("Write to local filesystem failed: " + e.getMessage());
+ }
+
+ // Now try to READ back
+ System.out.println("\n=== TEST: Read Parquet from Local Filesystem ===");
+ System.out.println("Reading from: " + localPath);
+
+ try {
+ Dataset<Row> employeesDf = spark.read().parquet(localPath);
+ employeesDf.createOrReplaceTempView("employees");
+
+ // Run SQL query
+ Dataset<Row> engineeringEmployees = spark.sql(
+ "SELECT name, salary FROM employees WHERE department = 'Engineering'");
+
+ long count = engineeringEmployees.count();
+ System.out.println("✅ Read completed successfully! Found " + count + " engineering employees");
+
+ assertEquals("Should find 2 engineering employees", 2, count);
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ ✅ SUCCESS! Local filesystem works perfectly! ║");
+ System.out.println("║ This proves the issue is SeaweedFS-specific! ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+
+ } catch (Exception e) {
+ if (e.getMessage() != null && e.getMessage().contains("EOFException") && e.getMessage().contains("78 bytes")) {
+ System.err.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.err.println("║ ❌ CRITICAL: 78-byte error ALSO occurs with local files! ║");
+ System.err.println("║ This proves the issue is NOT SeaweedFS-specific! ║");
+ System.err.println("║ The issue is in Spark itself or our test setup! ║");
+ System.err.println("╚══════════════════════════════════════════════════════════════╝");
+ }
+ System.err.println("❌ Read FAILED: " + e.getMessage());
+ e.printStackTrace();
+ fail("Read from local filesystem failed: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSparkWriteReadMultipleTimes() {
+ System.out.println("\n=== TEST: Multiple Write/Read Cycles ===");
+
+ for (int i = 1; i <= 3; i++) {
+ System.out.println("\n--- Cycle " + i + " ---");
+
+ List<Employee> employees = Arrays.asList(
+ new Employee(i * 10 + 1, "Person" + (i * 10 + 1), "Dept" + i, 50000 + i * 10000),
+ new Employee(i * 10 + 2, "Person" + (i * 10 + 2), "Dept" + i, 60000 + i * 10000));
+
+ Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
+ String localPath = "file://" + localTestDir + "/cycle" + i;
+
+ // Write
+ df.write().mode(SaveMode.Overwrite).parquet(localPath);
+ System.out.println("✅ Cycle " + i + " write completed");
+
+ // Read back immediately
+ Dataset<Row> readDf = spark.read().parquet(localPath);
+ long count = readDf.count();
+ System.out.println("✅ Cycle " + i + " read completed: " + count + " rows");
+
+ assertEquals("Should have 2 rows", 2, count);
+ }
+
+ System.out.println("\n✅ All cycles completed successfully!");
+ }
+
+ private void deleteDirectory(File directory) {
+ if (directory.exists()) {
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ }
+ }
+ directory.delete();
+ }
+ }
+
+ // Employee class for testing
+ public static class Employee implements java.io.Serializable {
+ private int id;
+ private String name;
+ private String department;
+ private int salary;
+
+ public Employee() {}
+
+ public Employee(int id, String name, String department, int salary) {
+ this.id = id;
+ this.name = name;
+ this.department = department;
+ this.salary = salary;
+ }
+
+ public int getId() { return id; }
+ public void setId(int id) { this.id = id; }
+ public String getName() { return name; }
+ public void setName(String name) { this.name = name; }
+ public String getDepartment() { return department; }
+ public void setDepartment(String department) { this.department = department; }
+ public int getSalary() { return salary; }
+ public void setSalary(int salary) { this.salary = salary; }
+ }
+}
+
diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkRawLocalFSTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkRawLocalFSTest.java
new file mode 100644
index 000000000..2fd3f4695
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SparkRawLocalFSTest.java
@@ -0,0 +1,132 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test Spark with Hadoop's RawLocalFileSystem to see if 78-byte error can be reproduced.
+ * This uses the EXACT same implementation as native local files.
+ */
+public class SparkRawLocalFSTest extends SparkTestBase {
+
+ private Path testPath;
+ private FileSystem rawLocalFs;
+
+ @Before
+ public void setUp() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ super.setUpSpark();
+
+ // Use RawLocalFileSystem explicitly
+ Configuration conf = new Configuration();
+ rawLocalFs = new RawLocalFileSystem();
+ rawLocalFs.initialize(java.net.URI.create("file:///"), conf);
+
+ testPath = new Path("/tmp/spark-rawlocal-test-" + System.currentTimeMillis());
+ rawLocalFs.delete(testPath, true);
+ rawLocalFs.mkdirs(testPath);
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ CRITICAL TEST: Spark with RawLocalFileSystem ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ System.out.println("Test directory: " + testPath);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+ if (rawLocalFs != null) {
+ rawLocalFs.delete(testPath, true);
+ rawLocalFs.close();
+ }
+ super.tearDownSpark();
+ }
+
+ @Test
+ public void testSparkWithRawLocalFileSystem() throws IOException {
+ skipIfTestsDisabled();
+
+ System.out.println("\n=== TEST: Write Parquet using RawLocalFileSystem ===");
+
+ // Create test data (same as SparkSQLTest)
+ List<Employee> employees = Arrays.asList(
+ new Employee(1, "Alice", "Engineering", 100000),
+ new Employee(2, "Bob", "Sales", 80000),
+ new Employee(3, "Charlie", "Engineering", 120000),
+ new Employee(4, "David", "Sales", 75000));
+
+ Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
+
+ // CRITICAL: Use file:// prefix to force local filesystem
+ String outputPath = "file://" + testPath.toString() + "/employees";
+ System.out.println("Writing to: " + outputPath);
+
+ // Write using Spark (will use file:// scheme, which uses RawLocalFileSystem)
+ df.write().mode(SaveMode.Overwrite).parquet(outputPath);
+
+ System.out.println("✅ Write completed successfully!");
+
+ // Verify by reading back
+ System.out.println("\n=== TEST: Read Parquet using RawLocalFileSystem ===");
+ System.out.println("Reading from: " + outputPath);
+ Dataset<Row> employeesDf = spark.read().parquet(outputPath);
+ employeesDf.createOrReplaceTempView("employees");
+
+ // Run SQL queries
+ Dataset<Row> engineeringEmployees = spark.sql(
+ "SELECT name, salary FROM employees WHERE department = 'Engineering'");
+
+ long count = engineeringEmployees.count();
+ assertEquals(2, count);
+ System.out.println("✅ Read completed successfully! Found " + count + " engineering employees");
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ ✅ SUCCESS! RawLocalFileSystem works perfectly! ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝");
+ }
+
+ // Employee class for Spark DataFrame
+ public static class Employee implements java.io.Serializable {
+ private int id;
+ private String name;
+ private String department;
+ private int salary;
+
+ public Employee() {} // Required for Spark
+
+ public Employee(int id, String name, String department, int salary) {
+ this.id = id;
+ this.name = name;
+ this.department = department;
+ this.salary = salary;
+ }
+
+ // Getters and Setters (required for Spark)
+ public int getId() { return id; }
+ public void setId(int id) { this.id = id; }
+ public String getName() { return name; }
+ public void setName(String name) { this.name = name; }
+ public String getDepartment() { return department; }
+ public void setDepartment(String department) { this.department = department; }
+ public int getSalary() { return salary; }
+ public void setSalary(int salary) { this.salary = salary; }
+ }
+}
diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkReadDirectParquetTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkReadDirectParquetTest.java
new file mode 100644
index 000000000..f93d43ce7
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SparkReadDirectParquetTest.java
@@ -0,0 +1,194 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test if Spark can read a Parquet file that was written directly
+ * (not by Spark) to SeaweedFS.
+ *
+ * This isolates whether the 78-byte EOF error is in:
+ * - Spark's WRITE path (if this test passes)
+ * - Spark's READ path (if this test also fails)
+ */
+public class SparkReadDirectParquetTest extends SparkTestBase {
+
+ private static final String SCHEMA_STRING =
+ "message Employee { " +
+ " required int32 id; " +
+ " required binary name (UTF8); " +
+ " required int32 age; " +
+ "}";
+
+ private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(SCHEMA_STRING);
+
+ @Test
+ public void testSparkReadDirectlyWrittenParquet() throws Exception {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ SPARK READS DIRECTLY-WRITTEN PARQUET FILE TEST ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝\n");
+
+ String testPath = getSeaweedFSPath("/direct-write-test/employees.parquet");
+
+ // Step 1: Write Parquet file directly (not via Spark)
+ System.out.println("=== Step 1: Writing Parquet file directly (bypassing Spark) ===");
+ writeParquetFileDirect(testPath);
+ System.out.println("✅ File written successfully: " + testPath);
+
+ // Step 2: Try to read it with Spark
+ System.out.println("\n=== Step 2: Reading file with Spark ===");
+ try {
+ Dataset<Row> df = spark.read().parquet(testPath);
+
+ System.out.println("Schema:");
+ df.printSchema();
+
+ long count = df.count();
+ System.out.println("Row count: " + count);
+
+ System.out.println("\nData:");
+ df.show();
+
+ // Verify data
+ assertEquals("Should have 3 rows", 3, count);
+
+ System.out.println("\n✅ SUCCESS! Spark can read directly-written Parquet file!");
+ System.out.println("✅ This proves the issue is in SPARK'S WRITE PATH, not read path!");
+
+ } catch (Exception e) {
+ System.out.println("\n❌ FAILED! Spark cannot read directly-written Parquet file!");
+ System.out.println("Error: " + e.getMessage());
+
+ if (e.getMessage() != null && e.getMessage().contains("bytes left")) {
+ System.out.println("🎯 This is the 78-byte EOF error!");
+ System.out.println("❌ This means the issue is in SPARK'S READ PATH!");
+ }
+
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testSparkWriteThenRead() throws Exception {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ return;
+ }
+
+ System.out.println("\n╔══════════════════════════════════════════════════════════════╗");
+ System.out.println("║ SPARK WRITES THEN READS PARQUET FILE TEST (BASELINE) ║");
+ System.out.println("╚══════════════════════════════════════════════════════════════╝\n");
+
+ String testPath = getSeaweedFSPath("/spark-write-test/employees");
+
+ // Step 1: Write with Spark
+ System.out.println("=== Step 1: Writing Parquet file with Spark ===");
+ spark.sql("CREATE TABLE IF NOT EXISTS test_employees (id INT, name STRING, age INT) USING parquet LOCATION '" + testPath + "'");
+ spark.sql("INSERT INTO test_employees VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)");
+ System.out.println("✅ File written by Spark");
+
+ // Step 2: Try to read it with Spark
+ System.out.println("\n=== Step 2: Reading file with Spark ===");
+ try {
+ Dataset<Row> df = spark.read().parquet(testPath);
+
+ System.out.println("Schema:");
+ df.printSchema();
+
+ long count = df.count();
+ System.out.println("Row count: " + count);
+
+ System.out.println("\nData:");
+ df.show();
+
+ assertEquals("Should have 3 rows", 3, count);
+
+ System.out.println("\n✅ SUCCESS! Spark can read its own Parquet file!");
+
+ } catch (Exception e) {
+ System.out.println("\n❌ FAILED! Spark cannot read its own Parquet file!");
+ System.out.println("Error: " + e.getMessage());
+
+ if (e.getMessage() != null && e.getMessage().contains("bytes left")) {
+ System.out.println("🎯 This is the 78-byte EOF error!");
+ }
+
+ e.printStackTrace();
+ throw e;
+ } finally {
+ spark.sql("DROP TABLE IF EXISTS test_employees");
+ }
+ }
+
+ private void writeParquetFileDirect(String seaweedPath) throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+ conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST);
+ conf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT);
+
+ FileSystem fs = FileSystem.get(java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT), conf);
+ Path path = new Path(seaweedPath);
+
+ // Ensure parent directory exists
+ fs.mkdirs(path.getParent());
+
+ GroupWriteSupport.setSchema(SCHEMA, conf);
+
+ try (ParquetWriter<Group> writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path)
+ .withConf(conf)
+ .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
+ .build()) {
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA);
+
+ // Write same 3 rows as Spark test
+ System.out.println(" Writing row 1: id=1, name=Alice, age=30");
+ Group group1 = factory.newGroup()
+ .append("id", 1)
+ .append("name", "Alice")
+ .append("age", 30);
+ writer.write(group1);
+
+ System.out.println(" Writing row 2: id=2, name=Bob, age=25");
+ Group group2 = factory.newGroup()
+ .append("id", 2)
+ .append("name", "Bob")
+ .append("age", 25);
+ writer.write(group2);
+
+ System.out.println(" Writing row 3: id=3, name=Charlie, age=35");
+ Group group3 = factory.newGroup()
+ .append("id", 3)
+ .append("name", "Charlie")
+ .append("age", 35);
+ writer.write(group3);
+ }
+
+ // Verify file was written
+ org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(path);
+ System.out.println(" File size: " + status.getLen() + " bytes");
+
+ fs.close();
+ }
+}
+
diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java
new file mode 100644
index 000000000..10ea1cd3a
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java
@@ -0,0 +1,239 @@
+package seaweed.spark;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Integration tests for Spark read/write operations with SeaweedFS.
+ */
+public class SparkReadWriteTest extends SparkTestBase {
+
+ @Test
+ public void testWriteAndReadParquet() {
+ skipIfTestsDisabled();
+
+ // Create test data
+ List<Person> people = Arrays.asList(
+ new Person("Alice", 30),
+ new Person("Bob", 25),
+ new Person("Charlie", 35));
+
+ Dataset<Row> df = spark.createDataFrame(people, Person.class);
+
+ // Write to SeaweedFS
+ String outputPath = getTestPath("people.parquet");
+ df.write().mode(SaveMode.Overwrite).parquet(outputPath);
+
+ // Read back from SeaweedFS
+ Dataset<Row> readDf = spark.read().parquet(outputPath);
+
+ // Verify
+ assertEquals(3, readDf.count());
+ assertEquals(2, readDf.columns().length);
+
+ List<Row> results = readDf.collectAsList();
+ assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer) r.getAs("age") == 30));
+ assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer) r.getAs("age") == 25));
+ assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer) r.getAs("age") == 35));
+ }
+
+ @Test
+ public void testWriteAndReadCSV() {
+ skipIfTestsDisabled();
+
+ // Create test data
+ List<Person> people = Arrays.asList(
+ new Person("Alice", 30),
+ new Person("Bob", 25));
+
+ Dataset<Row> df = spark.createDataFrame(people, Person.class);
+
+ // Write to SeaweedFS as CSV
+ String outputPath = getTestPath("people.csv");
+ df.write().mode(SaveMode.Overwrite).option("header", "true").csv(outputPath);
+
+ // Read back from SeaweedFS
+ Dataset<Row> readDf = spark.read().option("header", "true").option("inferSchema", "true").csv(outputPath);
+
+ // Verify
+ assertEquals(2, readDf.count());
+ assertEquals(2, readDf.columns().length);
+ }
+
+ @Test
+ public void testWriteAndReadJSON() {
+ skipIfTestsDisabled();
+
+ // Create test data
+ List<Person> people = Arrays.asList(
+ new Person("Alice", 30),
+ new Person("Bob", 25),
+ new Person("Charlie", 35));
+
+ Dataset<Row> df = spark.createDataFrame(people, Person.class);
+
+ // Write to SeaweedFS as JSON
+ String outputPath = getTestPath("people.json");
+ df.write().mode(SaveMode.Overwrite).json(outputPath);
+
+ // Read back from SeaweedFS
+ Dataset<Row> readDf = spark.read().json(outputPath);
+
+ // Verify
+ assertEquals(3, readDf.count());
+ assertEquals(2, readDf.columns().length);
+ }
+
+ @Test
+ public void testWritePartitionedData() {
+ skipIfTestsDisabled();
+
+ // Create test data with multiple years
+ List<PersonWithYear> people = Arrays.asList(
+ new PersonWithYear("Alice", 30, 2020),
+ new PersonWithYear("Bob", 25, 2021),
+ new PersonWithYear("Charlie", 35, 2020),
+ new PersonWithYear("David", 28, 2021));
+
+ Dataset<Row> df = spark.createDataFrame(people, PersonWithYear.class);
+
+ // Write partitioned data to SeaweedFS
+ String outputPath = getTestPath("people_partitioned");
+ df.write().mode(SaveMode.Overwrite).partitionBy("year").parquet(outputPath);
+
+ // Read back from SeaweedFS
+ Dataset<Row> readDf = spark.read().parquet(outputPath);
+
+ // Verify
+ assertEquals(4, readDf.count());
+
+ // Verify partition filtering works
+ Dataset<Row> filtered2020 = readDf.filter("year = 2020");
+ assertEquals(2, filtered2020.count());
+
+ Dataset<Row> filtered2021 = readDf.filter("year = 2021");
+ assertEquals(2, filtered2021.count());
+ }
+
+ @Test
+ public void testAppendMode() {
+ skipIfTestsDisabled();
+
+ String outputPath = getTestPath("people_append.parquet");
+
+ // Write first batch
+ List<Person> batch1 = Arrays.asList(
+ new Person("Alice", 30),
+ new Person("Bob", 25));
+ Dataset<Row> df1 = spark.createDataFrame(batch1, Person.class);
+ df1.write().mode(SaveMode.Overwrite).parquet(outputPath);
+
+ // Append second batch
+ List<Person> batch2 = Arrays.asList(
+ new Person("Charlie", 35),
+ new Person("David", 28));
+ Dataset<Row> df2 = spark.createDataFrame(batch2, Person.class);
+ df2.write().mode(SaveMode.Append).parquet(outputPath);
+
+ // Read back and verify
+ Dataset<Row> readDf = spark.read().parquet(outputPath);
+ assertEquals(4, readDf.count());
+ }
+
+ @Test
+ public void testLargeDataset() {
+ skipIfTestsDisabled();
+
+ // Create a larger dataset
+ Dataset<Row> largeDf = spark.range(0, 10000)
+ .selectExpr("id as value", "id * 2 as doubled");
+
+ String outputPath = getTestPath("large_dataset.parquet");
+ largeDf.write().mode(SaveMode.Overwrite).parquet(outputPath);
+
+ // Read back and verify
+ Dataset<Row> readDf = spark.read().parquet(outputPath);
+ assertEquals(10000, readDf.count());
+
+ // Verify some data (sort to ensure deterministic order)
+ Row firstRow = readDf.orderBy("value").first();
+ assertEquals(0L, firstRow.getLong(0));
+ assertEquals(0L, firstRow.getLong(1));
+ }
+
+ // Test data classes
+ public static class Person implements java.io.Serializable {
+ private String name;
+ private int age;
+
+ public Person() {
+ }
+
+ public Person(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+ }
+
+ public static class PersonWithYear implements java.io.Serializable {
+ private String name;
+ private int age;
+ private int year;
+
+ public PersonWithYear() {
+ }
+
+ public PersonWithYear(String name, int age, int year) {
+ this.name = name;
+ this.age = age;
+ this.year = year;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public int getYear() {
+ return year;
+ }
+
+ public void setYear(int year) {
+ this.year = year;
+ }
+ }
+}
diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java
new file mode 100644
index 000000000..231952023
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java
@@ -0,0 +1,278 @@
+package seaweed.spark;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Integration tests for Spark SQL operations with SeaweedFS.
+ */
+public class SparkSQLTest extends SparkTestBase {
+
+ @Test
+ public void testCreateTableAndQuery() {
+ skipIfTestsDisabled();
+
+ // Create test data
+ List<Employee> employees = Arrays.asList(
+ new Employee(1, "Alice", "Engineering", 100000),
+ new Employee(2, "Bob", "Sales", 80000),
+ new Employee(3, "Charlie", "Engineering", 120000),
+ new Employee(4, "David", "Sales", 75000));
+
+ Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
+
+ // Write to SeaweedFS
+ String tablePath = getTestPath("employees");
+ df.write().mode(SaveMode.Overwrite).parquet(tablePath);
+
+ // Create temporary view
+ Dataset<Row> employeesDf = spark.read().parquet(tablePath);
+ employeesDf.createOrReplaceTempView("employees");
+
+ // Run SQL queries
+ Dataset<Row> engineeringEmployees = spark.sql(
+ "SELECT name, salary FROM employees WHERE department = 'Engineering'");
+
+ assertEquals(2, engineeringEmployees.count());
+
+ Dataset<Row> highPaidEmployees = spark.sql(
+ "SELECT name, salary FROM employees WHERE salary > 90000");
+
+ assertEquals(2, highPaidEmployees.count());
+ }
+
+ @Test
+ public void testAggregationQueries() {
+ skipIfTestsDisabled();
+
+ // Create sales data
+ List<Sale> sales = Arrays.asList(
+ new Sale("2024-01", "Product A", 100),
+ new Sale("2024-01", "Product B", 150),
+ new Sale("2024-02", "Product A", 120),
+ new Sale("2024-02", "Product B", 180),
+ new Sale("2024-03", "Product A", 110));
+
+ Dataset<Row> df = spark.createDataFrame(sales, Sale.class);
+
+ // Write to SeaweedFS
+ String tablePath = getTestPath("sales");
+ df.write().mode(SaveMode.Overwrite).parquet(tablePath);
+
+ // Create temporary view
+ Dataset<Row> salesDf = spark.read().parquet(tablePath);
+ salesDf.createOrReplaceTempView("sales");
+
+ // Aggregate query
+ Dataset<Row> monthlySales = spark.sql(
+ "SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month");
+
+ List<Row> results = monthlySales.collectAsList();
+ assertEquals(3, results.size());
+ assertEquals("2024-01", results.get(0).getString(0));
+ assertEquals(250, results.get(0).getLong(1));
+ }
+
+ @Test
+ public void testJoinOperations() {
+ skipIfTestsDisabled();
+
+ // Create employee data
+ List<Employee> employees = Arrays.asList(
+ new Employee(1, "Alice", "Engineering", 100000),
+ new Employee(2, "Bob", "Sales", 80000));
+
+ // Create department data
+ List<Department> departments = Arrays.asList(
+ new Department("Engineering", "Building Products"),
+ new Department("Sales", "Selling Products"));
+
+ Dataset<Row> empDf = spark.createDataFrame(employees, Employee.class);
+ Dataset<Row> deptDf = spark.createDataFrame(departments, Department.class);
+
+ // Write to SeaweedFS
+ String empPath = getTestPath("employees_join");
+ String deptPath = getTestPath("departments_join");
+
+ empDf.write().mode(SaveMode.Overwrite).parquet(empPath);
+ deptDf.write().mode(SaveMode.Overwrite).parquet(deptPath);
+
+ // Read back and create views
+ spark.read().parquet(empPath).createOrReplaceTempView("emp");
+ spark.read().parquet(deptPath).createOrReplaceTempView("dept");
+
+ // Join query
+ Dataset<Row> joined = spark.sql(
+ "SELECT e.name, e.salary, d.description " +
+ "FROM emp e JOIN dept d ON e.department = d.name");
+
+ assertEquals(2, joined.count());
+
+ List<Row> results = joined.collectAsList();
+ assertTrue(results.stream()
+ .anyMatch(r -> "Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2))));
+ }
+
+ @Test
+ public void testWindowFunctions() {
+ skipIfTestsDisabled();
+
+ // Create employee data with salaries
+ List<Employee> employees = Arrays.asList(
+ new Employee(1, "Alice", "Engineering", 100000),
+ new Employee(2, "Bob", "Engineering", 120000),
+ new Employee(3, "Charlie", "Sales", 80000),
+ new Employee(4, "David", "Sales", 90000));
+
+ Dataset<Row> df = spark.createDataFrame(employees, Employee.class);
+
+ String tablePath = getTestPath("employees_window");
+ df.write().mode(SaveMode.Overwrite).parquet(tablePath);
+
+ Dataset<Row> employeesDf = spark.read().parquet(tablePath);
+ employeesDf.createOrReplaceTempView("employees_ranked");
+
+ // Window function query - rank employees by salary within department
+ Dataset<Row> ranked = spark.sql(
+ "SELECT name, department, salary, " +
+ "RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " +
+ "FROM employees_ranked");
+
+ assertEquals(4, ranked.count());
+
+ // Verify Bob has rank 1 in Engineering (highest salary)
+ List<Row> results = ranked.collectAsList();
+ Row bobRow = results.stream()
+ .filter(r -> "Bob".equals(r.getString(0)))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(bobRow);
+ assertEquals(1, bobRow.getInt(3));
+ }
+
+ // Test data classes
+ public static class Employee implements java.io.Serializable {
+ private int id;
+ private String name;
+ private String department;
+ private int salary;
+
+ public Employee() {
+ }
+
+ public Employee(int id, String name, String department, int salary) {
+ this.id = id;
+ this.name = name;
+ this.department = department;
+ this.salary = salary;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDepartment() {
+ return department;
+ }
+
+ public void setDepartment(String department) {
+ this.department = department;
+ }
+
+ public int getSalary() {
+ return salary;
+ }
+
+ public void setSalary(int salary) {
+ this.salary = salary;
+ }
+ }
+
+ public static class Sale implements java.io.Serializable {
+ private String month;
+ private String product;
+ private int amount;
+
+ public Sale() {
+ }
+
+ public Sale(String month, String product, int amount) {
+ this.month = month;
+ this.product = product;
+ this.amount = amount;
+ }
+
+ public String getMonth() {
+ return month;
+ }
+
+ public void setMonth(String month) {
+ this.month = month;
+ }
+
+ public String getProduct() {
+ return product;
+ }
+
+ public void setProduct(String product) {
+ this.product = product;
+ }
+
+ public int getAmount() {
+ return amount;
+ }
+
+ public void setAmount(int amount) {
+ this.amount = amount;
+ }
+ }
+
+ public static class Department implements java.io.Serializable {
+ private String name;
+ private String description;
+
+ public Department() {
+ }
+
+ public Department(String name, String description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+ }
+}
diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java
new file mode 100644
index 000000000..5b17e6f2d
--- /dev/null
+++ b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java
@@ -0,0 +1,128 @@
+package seaweed.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+
+/**
+ * Base class for Spark integration tests with SeaweedFS.
+ *
+ * These tests require a running SeaweedFS cluster.
+ * Set environment variable SEAWEEDFS_TEST_ENABLED=true to enable these tests.
+ */
+public abstract class SparkTestBase {
+
+ protected SparkSession spark;
+ protected static final String TEST_ROOT = "/test-spark";
+ protected static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED"));
+
+ // SeaweedFS connection settings
+ protected static final String SEAWEEDFS_HOST = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost");
+ protected static final String SEAWEEDFS_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888");
+ protected static final String SEAWEEDFS_GRPC_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT",
+ "18888");
+
+ @Before
+ public void setUpSpark() throws IOException {
+ if (!TESTS_ENABLED) {
+ return;
+ }
+
+ SparkConf sparkConf = new SparkConf()
+ .setAppName("SeaweedFS Integration Test")
+ .setMaster("local[1]") // Single thread to avoid concurrent gRPC issues
+ .set("spark.driver.host", "localhost")
+ .set("spark.sql.warehouse.dir", getSeaweedFSPath("/spark-warehouse"))
+ // SeaweedFS configuration
+ .set("spark.hadoop.fs.defaultFS", String.format("seaweedfs://%s:%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT))
+ .set("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem")
+ .set("spark.hadoop.fs.seaweed.impl", "seaweed.hdfs.SeaweedFileSystem")
+ .set("spark.hadoop.fs.seaweed.filer.host", SEAWEEDFS_HOST)
+ .set("spark.hadoop.fs.seaweed.filer.port", SEAWEEDFS_PORT)
+ .set("spark.hadoop.fs.seaweed.filer.port.grpc", SEAWEEDFS_GRPC_PORT)
+ .set("spark.hadoop.fs.AbstractFileSystem.seaweedfs.impl", "seaweed.hdfs.SeaweedAbstractFileSystem")
+ // Set replication to empty string to use filer default
+ .set("spark.hadoop.fs.seaweed.replication", "")
+ // Smaller buffer to reduce load
+ .set("spark.hadoop.fs.seaweed.buffer.size", "1048576") // 1MB
+ // Reduce parallelism
+ .set("spark.default.parallelism", "1")
+ .set("spark.sql.shuffle.partitions", "1")
+ // Simpler output committer
+ .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
+ .set("spark.sql.sources.commitProtocolClass",
+ "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
+ // Disable speculative execution to reduce load
+ .set("spark.speculation", "false")
+ // Increase task retry to handle transient consistency issues
+ .set("spark.task.maxFailures", "4")
+ // Wait longer before retrying failed tasks
+ .set("spark.task.reaper.enabled", "true")
+ .set("spark.task.reaper.pollingInterval", "1s");
+
+ spark = SparkSession.builder()
+ .config(sparkConf)
+ .getOrCreate();
+
+ // Clean up test directory
+ cleanupTestDirectory();
+ }
+
+ @After
+ public void tearDownSpark() {
+ if (!TESTS_ENABLED || spark == null) {
+ return;
+ }
+
+ try {
+ // Try to cleanup but don't fail if it doesn't work
+ cleanupTestDirectory();
+ } catch (Exception e) {
+ System.err.println("Cleanup failed: " + e.getMessage());
+ } finally {
+ try {
+ spark.stop();
+ } catch (Exception e) {
+ System.err.println("Spark stop failed: " + e.getMessage());
+ }
+ spark = null;
+ }
+ }
+
+ protected String getSeaweedFSPath(String path) {
+ return String.format("seaweedfs://%s:%s%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT, path);
+ }
+
+ protected String getTestPath(String subPath) {
+ return getSeaweedFSPath(TEST_ROOT + "/" + subPath);
+ }
+
+ private void cleanupTestDirectory() {
+ if (spark != null) {
+ try {
+ Configuration conf = spark.sparkContext().hadoopConfiguration();
+ org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(
+ java.net.URI.create(getSeaweedFSPath("/")), conf);
+ org.apache.hadoop.fs.Path testPath = new org.apache.hadoop.fs.Path(TEST_ROOT);
+ if (fs.exists(testPath)) {
+ fs.delete(testPath, true);
+ }
+ } catch (Exception e) {
+ // Suppress cleanup errors - they shouldn't fail tests
+ // Common in distributed systems with eventual consistency
+ System.err.println("Warning: cleanup failed (non-critical): " + e.getMessage());
+ }
+ }
+ }
+
+ protected void skipIfTestsDisabled() {
+ if (!TESTS_ENABLED) {
+ System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set");
+ org.junit.Assume.assumeTrue("SEAWEEDFS_TEST_ENABLED not set", false);
+ }
+ }
+}
diff --git a/test/java/spark/src/test/resources/log4j.properties b/test/java/spark/src/test/resources/log4j.properties
new file mode 100644
index 000000000..0a603e1b0
--- /dev/null
+++ b/test/java/spark/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Set root logger level
+log4j.rootLogger=WARN, console
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set log levels for specific packages
+log4j.logger.org.apache.spark=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.parquet=WARN
+log4j.logger.seaweed=INFO
+
+# Suppress unnecessary warnings
+log4j.logger.org.apache.spark.util.Utils=ERROR
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
+