diff options
Diffstat (limited to 'test')
23 files changed, 4440 insertions, 41 deletions
diff --git a/test/java/spark/.gitignore b/test/java/spark/.gitignore new file mode 100644 index 000000000..62341354a --- /dev/null +++ b/test/java/spark/.gitignore @@ -0,0 +1,33 @@ +# Maven +target/ +.m2/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties + +# IDE +.idea/ +*.iml +.vscode/ +.classpath +.project +.settings/ + +# Spark +spark-warehouse/ +metastore_db/ +derby.log + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db + + diff --git a/test/java/spark/Makefile b/test/java/spark/Makefile new file mode 100644 index 000000000..462447c66 --- /dev/null +++ b/test/java/spark/Makefile @@ -0,0 +1,75 @@ +.PHONY: help build test test-local test-docker clean run-example docker-up docker-down + +help: + @echo "SeaweedFS Spark Integration Tests" + @echo "" + @echo "Available targets:" + @echo " build - Build the project" + @echo " test - Run integration tests (requires SeaweedFS running)" + @echo " test-local - Run tests against local SeaweedFS" + @echo " test-docker - Run tests in Docker with SeaweedFS" + @echo " run-example - Run the example Spark application" + @echo " docker-up - Start SeaweedFS in Docker" + @echo " docker-down - Stop SeaweedFS Docker containers" + @echo " clean - Clean build artifacts" + +build: + mvn clean package + +test: + @if [ -z "$$SEAWEEDFS_TEST_ENABLED" ]; then \ + echo "Setting SEAWEEDFS_TEST_ENABLED=true"; \ + fi + SEAWEEDFS_TEST_ENABLED=true mvn test + +test-local: + @echo "Testing against local SeaweedFS (localhost:8888)..." + ./run-tests.sh + +test-docker: + @echo "Running tests in Docker..." + docker compose up --build --abort-on-container-exit spark-tests + docker compose down + +docker-up: + @echo "Starting SeaweedFS in Docker..." + docker compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer + @echo "Waiting for services to be ready..." + @sleep 5 + @echo "SeaweedFS is ready!" + @echo " Master: http://localhost:9333" + @echo " Filer: http://localhost:8888" + +docker-down: + @echo "Stopping SeaweedFS Docker containers..." + docker compose down -v + +run-example: + @echo "Running example application..." + @if ! command -v spark-submit > /dev/null; then \ + echo "Error: spark-submit not found. Please install Apache Spark."; \ + exit 1; \ + fi + spark-submit \ + --class seaweed.spark.SparkSeaweedFSExample \ + --master local[2] \ + --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + --conf spark.hadoop.fs.seaweed.replication="" \ + target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + seaweedfs://localhost:8888/spark-example-output + +clean: + mvn clean + @echo "Build artifacts cleaned" + +verify-seaweedfs: + @echo "Verifying SeaweedFS connection..." + @curl -f http://localhost:8888/ > /dev/null 2>&1 && \ + echo "✓ SeaweedFS filer is accessible" || \ + (echo "✗ SeaweedFS filer is not accessible at http://localhost:8888"; exit 1) + +.DEFAULT_GOAL := help + diff --git a/test/java/spark/docker-compose.yml b/test/java/spark/docker-compose.yml new file mode 100644 index 000000000..ed8757b88 --- /dev/null +++ b/test/java/spark/docker-compose.yml @@ -0,0 +1,100 @@ +services: + seaweedfs-master: + build: + context: ../../../docker + dockerfile: Dockerfile.local + image: seaweedfs:local + container_name: seaweedfs-spark-master + ports: + - "9333:9333" + - "19333:19333" + command: "master -ip=seaweedfs-master -ip.bind=0.0.0.0 -port=9333 -port.grpc=19333 -volumeSizeLimitMB=50 -defaultReplication=000 -peers=none" + networks: + - seaweedfs-spark + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:9333/cluster/status"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 10s + + seaweedfs-volume: + build: + context: ../../../docker + dockerfile: Dockerfile.local + image: seaweedfs:local + container_name: seaweedfs-spark-volume + ports: + - "8080:8080" + - "18080:18080" + command: "volume -mserver=seaweedfs-master:9333 -ip=seaweedfs-volume -ip.bind=0.0.0.0 -port=8080 -port.grpc=18080 -publicUrl=seaweedfs-volume:8080 -max=100 -dir=/data -preStopSeconds=1" + volumes: + - seaweedfs-volume-data:/data + depends_on: + seaweedfs-master: + condition: service_healthy + networks: + - seaweedfs-spark + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8080/status"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 10s + + seaweedfs-filer: + build: + context: ../../../docker + dockerfile: Dockerfile.local + image: seaweedfs:local + container_name: seaweedfs-spark-filer + ports: + - "8888:8888" + - "18888:18888" + command: "filer -master=seaweedfs-master:9333 -ip=seaweedfs-filer -ip.bind=0.0.0.0 -port=8888 -port.grpc=18888" + depends_on: + seaweedfs-master: + condition: service_healthy + seaweedfs-volume: + condition: service_healthy + networks: + - seaweedfs-spark + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8888/"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 15s + + spark-tests: + image: maven:3.9-eclipse-temurin-17 + container_name: seaweedfs-spark-tests + volumes: + - .:/workspace + - ./.m2:/root/.m2 + working_dir: /workspace + environment: + - SEAWEEDFS_TEST_ENABLED=true + - SEAWEEDFS_FILER_HOST=seaweedfs-filer + - SEAWEEDFS_FILER_PORT=8888 + - SEAWEEDFS_FILER_GRPC_PORT=18888 + - HADOOP_HOME=/tmp + # Disable Java DNS caching to ensure fresh DNS lookups + - MAVEN_OPTS=-Dsun.net.inetaddr.ttl=0 -Dnetworkaddress.cache.ttl=0 + - SPARK_SUBMIT_OPTS=-Dfs.seaweedfs.impl.disable.cache=true + command: sh -c "sleep 30 && mvn clean test" + depends_on: + seaweedfs-filer: + condition: service_healthy + networks: + - seaweedfs-spark + mem_limit: 4g + cpus: 2 + +networks: + seaweedfs-spark: + driver: bridge + +volumes: + seaweedfs-volume-data: + diff --git a/test/java/spark/pom.xml b/test/java/spark/pom.xml new file mode 100644 index 000000000..22228a856 --- /dev/null +++ b/test/java/spark/pom.xml @@ -0,0 +1,348 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.seaweedfs</groupId> + <artifactId>seaweedfs-spark-integration-tests</artifactId> + <version>1.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>SeaweedFS Spark Integration Tests</name> + <description>Integration tests for Apache Spark with SeaweedFS HDFS client</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + <spark.version>3.5.0</spark.version> + <hadoop.version>3.3.6</hadoop.version> + <scala.binary.version>2.12</scala.binary.version> + <junit.version>4.13.2</junit.version> + <seaweedfs.hadoop3.client.version>3.80.1-SNAPSHOT</seaweedfs.hadoop3.client.version> + <jackson.version>2.18.2</jackson.version> <!-- Upgraded from 2.15.3 --> + <netty.version>4.1.125.Final</netty.version> <!-- Upgraded to 4.1.125.Final for security fixes (CVE in netty-codec < 4.1.125.Final, netty-codec-http2 <= 4.1.123.Final) --> + <parquet.version>1.15.2</parquet.version> <!-- Upgraded to 1.15.2 for security fix --> + <parquet.format.version>2.12.0</parquet.format.version> + <surefire.jvm.args> + -Xmx2g + -Dhadoop.home.dir=/tmp + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + </surefire.jvm.args> + </properties> + + <!-- Override vulnerable transitive dependencies --> + <dependencyManagement> + <dependencies> + <!-- Jackson - Fix CVEs in older versions --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> + <version>${jackson.version}</version> + </dependency> + + <!-- Netty - Fix CVEs in older versions --> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http2</artifactId> + <version>${netty.version}</version> + </dependency> + + <!-- Apache Avro - Fix CVEs --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.11.4</version> + </dependency> + + <!-- Apache ZooKeeper - Fix CVEs --> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.9.4</version> + </dependency> + + <!-- Apache Commons - Fix CVEs --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.26.0</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.15.1</version> + </dependency> + <dependency> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils</artifactId> + <version>1.11.0</version> + </dependency> + + <!-- Guava - Fix CVEs --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>32.1.3-jre</version> + </dependency> + + <!-- SnakeYAML - Fix CVEs --> + <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + <version>2.2</version> + </dependency> + + <!-- Protobuf - Fix CVEs --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>3.25.5</version> + </dependency> + + <!-- Nimbus JOSE JWT - Fix CVEs (GHSA-xwmg-2g98-w7v9 and others) --> + <dependency> + <groupId>com.nimbusds</groupId> + <artifactId>nimbus-jose-jwt</artifactId> + <version>10.0.2</version> + </dependency> + + <!-- Snappy Java - Fix CVEs --> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.10.4</version> + </dependency> + + <!-- DNS Java - Fix CVEs --> + <dependency> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + <version>3.6.0</version> + </dependency> + + <!-- Apache Parquet - Upgrade to latest for bug fixes --> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-common</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-encoding</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-format-structures</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-format</artifactId> + <version>${parquet.format.version}</version> + </dependency> + + </dependencies> + </dependencyManagement> + + <dependencies> + <!-- Spark Core --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Spark SQL --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Hadoop Client --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + + <!-- SeaweedFS Hadoop3 Client --> + <dependency> + <groupId>com.seaweedfs</groupId> + <artifactId>seaweedfs-hadoop3-client</artifactId> + <version>${seaweedfs.hadoop3.client.version}</version> + </dependency> + + <!-- Testing --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.36</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + <version>1.7.36</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.11.0</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <skipTests>${skipTests}</skipTests> + <includes> + <include>**/*Test.java</include> + </includes> + <argLine>${surefire.jvm.args}</argLine> + <systemPropertyVariables> + <log4j.configuration>file:${project.basedir}/src/test/resources/log4j.properties</log4j.configuration> + </systemPropertyVariables> + <environmentVariables> + <HADOOP_HOME>/tmp</HADOOP_HOME> + </environmentVariables> + </configuration> + </plugin> + + <!-- Shade plugin to create fat jar for Spark submit --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.5.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>seaweed.spark.SparkSeaweedFSExample</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> + diff --git a/test/java/spark/quick-start.sh b/test/java/spark/quick-start.sh new file mode 100755 index 000000000..974363311 --- /dev/null +++ b/test/java/spark/quick-start.sh @@ -0,0 +1,149 @@ +#!/bin/bash + +set -e + +echo "=== SeaweedFS Spark Integration Tests Quick Start ===" +echo "" + +# Check if SeaweedFS is running +check_seaweedfs() { + echo "Checking if SeaweedFS is running..." + if curl -f http://localhost:8888/ > /dev/null 2>&1; then + echo "✓ SeaweedFS filer is accessible at http://localhost:8888" + return 0 + else + echo "✗ SeaweedFS filer is not accessible" + return 1 + fi +} + +# Start SeaweedFS with Docker if not running +start_seaweedfs() { + echo "" + echo "Starting SeaweedFS with Docker..." + docker-compose up -d seaweedfs-master seaweedfs-volume seaweedfs-filer + + echo "Waiting for SeaweedFS to be ready..." + for i in {1..30}; do + if curl -f http://localhost:8888/ > /dev/null 2>&1; then + echo "✓ SeaweedFS is ready!" + return 0 + fi + echo -n "." + sleep 2 + done + + echo "" + echo "✗ SeaweedFS failed to start" + return 1 +} + +# Build the project +build_project() { + echo "" + echo "Building the project..." + mvn clean package -DskipTests + echo "✓ Build completed" +} + +# Run tests +run_tests() { + echo "" + echo "Running integration tests..." + export SEAWEEDFS_TEST_ENABLED=true + mvn test + echo "✓ Tests completed" +} + +# Run example +run_example() { + echo "" + echo "Running example application..." + + if ! command -v spark-submit > /dev/null; then + echo "⚠ spark-submit not found. Skipping example application." + echo "To run the example, install Apache Spark and try: make run-example" + return 0 + fi + + spark-submit \ + --class seaweed.spark.SparkSeaweedFSExample \ + --master local[2] \ + --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + seaweedfs://localhost:8888/spark-quickstart-output + + echo "✓ Example completed" +} + +# Cleanup +cleanup() { + echo "" + echo "Cleaning up..." + docker-compose down -v + echo "✓ Cleanup completed" +} + +# Main execution +main() { + # Check if Docker is available + if ! command -v docker > /dev/null; then + echo "Error: Docker is not installed or not in PATH" + exit 1 + fi + + # Check if Maven is available + if ! command -v mvn > /dev/null; then + echo "Error: Maven is not installed or not in PATH" + exit 1 + fi + + # Check if SeaweedFS is running, if not start it + if ! check_seaweedfs; then + read -p "Do you want to start SeaweedFS with Docker? (y/n) " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + start_seaweedfs || exit 1 + else + echo "Please start SeaweedFS manually and rerun this script." + exit 1 + fi + fi + + # Build project + build_project || exit 1 + + # Run tests + run_tests || exit 1 + + # Run example if Spark is available + run_example + + echo "" + echo "=== Quick Start Completed Successfully! ===" + echo "" + echo "Next steps:" + echo " - View test results in target/surefire-reports/" + echo " - Check example output at http://localhost:8888/" + echo " - Run 'make help' for more options" + echo " - Read README.md for detailed documentation" + echo "" + + read -p "Do you want to stop SeaweedFS? (y/n) " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + cleanup + fi +} + +# Handle Ctrl+C +trap cleanup INT + +# Run main +main + + + diff --git a/test/java/spark/run-tests.sh b/test/java/spark/run-tests.sh new file mode 100755 index 000000000..f637c8c59 --- /dev/null +++ b/test/java/spark/run-tests.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +set -e + +echo "=== SeaweedFS Spark Integration Tests Runner ===" +echo "" + +# Check if SeaweedFS is running +check_seaweedfs() { + if curl -f http://localhost:8888/ > /dev/null 2>&1; then + echo "✓ SeaweedFS filer is accessible at http://localhost:8888" + return 0 + else + echo "✗ SeaweedFS filer is not accessible" + return 1 + fi +} + +# Main +if ! check_seaweedfs; then + echo "" + echo "Please start SeaweedFS first. You can use:" + echo " cd test/java/spark && docker-compose up -d" + echo "Or:" + echo " make docker-up" + exit 1 +fi + +echo "" +echo "Running Spark integration tests..." +echo "" + +export SEAWEEDFS_TEST_ENABLED=true +export SEAWEEDFS_FILER_HOST=localhost +export SEAWEEDFS_FILER_PORT=8888 +export SEAWEEDFS_FILER_GRPC_PORT=18888 + +# Run tests +mvn test "$@" + +echo "" +echo "✓ Test run completed" +echo "View detailed reports in: target/surefire-reports/" + + + diff --git a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java new file mode 100644 index 000000000..75b2d710b --- /dev/null +++ b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java @@ -0,0 +1,138 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +/** + * Example Spark application demonstrating SeaweedFS integration. + * + * This can be submitted to a Spark cluster using spark-submit. + * + * Example usage: + * spark-submit \ + * --class seaweed.spark.SparkSeaweedFSExample \ + * --master local[2] \ + * --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \ + * --conf spark.hadoop.fs.seaweed.filer.host=localhost \ + * --conf spark.hadoop.fs.seaweed.filer.port=8888 \ + * --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \ + * target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \ + * seaweedfs://localhost:8888/output + */ +public class SparkSeaweedFSExample { + + public static void main(String[] args) { + if (args.length < 1) { + System.err.println("Usage: SparkSeaweedFSExample <output-path>"); + System.err.println("Example: seaweedfs://localhost:8888/spark-output"); + System.exit(1); + } + + String outputPath = args[0]; + + // Create Spark session + SparkSession spark = SparkSession.builder() + .appName("SeaweedFS Spark Example") + .getOrCreate(); + + try { + System.out.println("=== SeaweedFS Spark Integration Example ===\n"); + + // Example 1: Generate data and write to SeaweedFS + System.out.println("1. Generating sample data..."); + Dataset<Row> data = spark.range(0, 1000) + .selectExpr( + "id", + "id * 2 as doubled", + "CAST(rand() * 100 AS INT) as random_value"); + + System.out.println(" Generated " + data.count() + " rows"); + data.show(5); + + // Write as Parquet + String parquetPath = outputPath + "/data.parquet"; + System.out.println("\n2. Writing data to SeaweedFS as Parquet..."); + System.out.println(" Path: " + parquetPath); + + data.write() + .mode(SaveMode.Overwrite) + .parquet(parquetPath); + + System.out.println(" ✓ Write completed"); + + // Read back and verify + System.out.println("\n3. Reading data back from SeaweedFS..."); + Dataset<Row> readData = spark.read().parquet(parquetPath); + System.out.println(" Read " + readData.count() + " rows"); + + // Perform aggregation + System.out.println("\n4. Performing aggregation..."); + Dataset<Row> stats = readData.selectExpr( + "COUNT(*) as count", + "AVG(random_value) as avg_random", + "MAX(doubled) as max_doubled"); + + stats.show(); + + // Write aggregation results + String statsPath = outputPath + "/stats.parquet"; + System.out.println("5. Writing stats to: " + statsPath); + stats.write() + .mode(SaveMode.Overwrite) + .parquet(statsPath); + + // Create a partitioned dataset + System.out.println("\n6. Creating partitioned dataset..."); + Dataset<Row> partitionedData = data.selectExpr( + "*", + "CAST(id % 10 AS INT) as partition_key"); + + String partitionedPath = outputPath + "/partitioned.parquet"; + System.out.println(" Path: " + partitionedPath); + + partitionedData.write() + .mode(SaveMode.Overwrite) + .partitionBy("partition_key") + .parquet(partitionedPath); + + System.out.println(" ✓ Partitioned write completed"); + + // Read specific partition + System.out.println("\n7. Reading specific partition (partition_key=0)..."); + Dataset<Row> partition0 = spark.read() + .parquet(partitionedPath) + .filter("partition_key = 0"); + + System.out.println(" Partition 0 contains " + partition0.count() + " rows"); + partition0.show(5); + + // SQL example + System.out.println("\n8. Using Spark SQL..."); + readData.createOrReplaceTempView("seaweedfs_data"); + + Dataset<Row> sqlResult = spark.sql( + "SELECT " + + " CAST(id / 100 AS INT) as bucket, " + + " COUNT(*) as count, " + + " AVG(random_value) as avg_random " + + "FROM seaweedfs_data " + + "GROUP BY CAST(id / 100 AS INT) " + + "ORDER BY bucket"); + + System.out.println(" Bucketed statistics:"); + sqlResult.show(); + + System.out.println("\n=== Example completed successfully! ==="); + System.out.println("Output location: " + outputPath); + + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } finally { + spark.stop(); + } + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/GetPosBufferTest.java b/test/java/spark/src/test/java/seaweed/spark/GetPosBufferTest.java new file mode 100644 index 000000000..86dde66ab --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/GetPosBufferTest.java @@ -0,0 +1,308 @@ +package seaweed.spark; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedInputStream; +import seaweedfs.client.SeaweedOutputStream; +import seaweedfs.client.SeaweedRead; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.*; + +/** + * Unit test to reproduce the Parquet EOF issue. + * + * The issue: When Parquet writes column chunks, it calls getPos() to record + * offsets. + * If getPos() returns a position that doesn't include buffered (unflushed) + * data, + * the footer metadata will have incorrect offsets. + * + * This test simulates Parquet's behavior: + * 1. Write some data (column chunk 1) + * 2. Call getPos() - Parquet records this as the END of chunk 1 + * 3. Write more data (column chunk 2) + * 4. Call getPos() - Parquet records this as the END of chunk 2 + * 5. Close the file + * 6. Verify that the recorded positions match the actual file content + * + * Prerequisites: + * - SeaweedFS master, volume server, and filer must be running + * - Default ports: filer HTTP 8888, filer gRPC 18888 + * + * To run: + * export SEAWEEDFS_TEST_ENABLED=true + * cd other/java/client + * mvn test -Dtest=GetPosBufferTest + */ +public class GetPosBufferTest { + + private FilerClient filerClient; + private static final String TEST_ROOT = "/test-getpos-buffer"; + private static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + + @Before + public void setUp() throws Exception { + if (!TESTS_ENABLED) { + return; + } + + String filerHost = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); + String filerGrpcPort = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", "18888"); + + filerClient = new FilerClient(filerHost, Integer.parseInt(filerGrpcPort)); + + // Clean up any existing test directory + if (filerClient.exists(TEST_ROOT)) { + filerClient.rm(TEST_ROOT, true, true); + } + + // Create test root directory + filerClient.mkdirs(TEST_ROOT, 0755); + } + + @After + public void tearDown() throws Exception { + if (!TESTS_ENABLED) { + return; + } + if (filerClient != null) { + filerClient.rm(TEST_ROOT, true, true); + filerClient.shutdown(); + } + } + + @Test + public void testGetPosWithBufferedData() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with buffered data ==="); + + String testPath = TEST_ROOT + "/getpos-test.bin"; + + // Simulate what Parquet does when writing column chunks + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Write "column chunk 1" - 100 bytes + byte[] chunk1 = new byte[100]; + for (int i = 0; i < 100; i++) { + chunk1[i] = (byte) i; + } + outputStream.write(chunk1); + + // Parquet calls getPos() here to record end of chunk 1 + long posAfterChunk1 = outputStream.getPos(); + System.out.println("Position after chunk 1 (100 bytes): " + posAfterChunk1); + assertEquals("getPos() should return 100 after writing 100 bytes", 100, posAfterChunk1); + + // Write "column chunk 2" - 200 bytes + byte[] chunk2 = new byte[200]; + for (int i = 0; i < 200; i++) { + chunk2[i] = (byte) (i + 100); + } + outputStream.write(chunk2); + + // Parquet calls getPos() here to record end of chunk 2 + long posAfterChunk2 = outputStream.getPos(); + System.out.println("Position after chunk 2 (200 more bytes): " + posAfterChunk2); + assertEquals("getPos() should return 300 after writing 300 bytes total", 300, posAfterChunk2); + + // Write "column chunk 3" - small chunk of 78 bytes (the problematic size!) + byte[] chunk3 = new byte[78]; + for (int i = 0; i < 78; i++) { + chunk3[i] = (byte) (i + 50); + } + outputStream.write(chunk3); + + // Parquet calls getPos() here to record end of chunk 3 + long posAfterChunk3 = outputStream.getPos(); + System.out.println("Position after chunk 3 (78 more bytes): " + posAfterChunk3); + assertEquals("getPos() should return 378 after writing 378 bytes total", 378, posAfterChunk3); + + // Close to flush everything + outputStream.close(); + System.out.println("File closed successfully"); + + // Now read the file and verify its actual size matches what getPos() reported + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + + long actualFileSize = SeaweedRead.fileSize(entry); + System.out.println("Actual file size on disk: " + actualFileSize); + + assertEquals("File size should match the last getPos() value", 378, actualFileSize); + + // Now read the file and verify we can read all the data + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + + byte[] readBuffer = new byte[500]; // Larger buffer to read everything + int totalRead = 0; + int bytesRead; + while ((bytesRead = inputStream.read(readBuffer, totalRead, readBuffer.length - totalRead)) > 0) { + totalRead += bytesRead; + } + inputStream.close(); + + System.out.println("Total bytes read: " + totalRead); + assertEquals("Should read exactly 378 bytes", 378, totalRead); + + // Verify the data is correct + for (int i = 0; i < 100; i++) { + assertEquals("Chunk 1 data mismatch at byte " + i, (byte) i, readBuffer[i]); + } + for (int i = 0; i < 200; i++) { + assertEquals("Chunk 2 data mismatch at byte " + (100 + i), (byte) (i + 100), readBuffer[100 + i]); + } + for (int i = 0; i < 78; i++) { + assertEquals("Chunk 3 data mismatch at byte " + (300 + i), (byte) (i + 50), readBuffer[300 + i]); + } + + System.out.println("SUCCESS: All data verified correctly!\n"); + } + + @Test + public void testGetPosWithSmallWrites() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with many small writes (Parquet pattern) ==="); + + String testPath = TEST_ROOT + "/small-writes-test.bin"; + + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Parquet writes column data in small chunks and frequently calls getPos() + String[] columnData = { "Alice", "Bob", "Charlie", "David" }; + long[] recordedPositions = new long[columnData.length]; + + for (int i = 0; i < columnData.length; i++) { + byte[] data = columnData[i].getBytes(StandardCharsets.UTF_8); + outputStream.write(data); + + // Parquet calls getPos() after each value to track offsets + recordedPositions[i] = outputStream.getPos(); + System.out.println("After writing '" + columnData[i] + "': pos=" + recordedPositions[i]); + } + + long finalPos = outputStream.getPos(); + System.out.println("Final position before close: " + finalPos); + + outputStream.close(); + + // Verify file size + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + long actualFileSize = SeaweedRead.fileSize(entry); + + System.out.println("Actual file size: " + actualFileSize); + assertEquals("File size should match final getPos()", finalPos, actualFileSize); + + // Verify we can read using the recorded positions + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + + long currentPos = 0; + for (int i = 0; i < columnData.length; i++) { + long nextPos = recordedPositions[i]; + int length = (int) (nextPos - currentPos); + + byte[] buffer = new byte[length]; + int bytesRead = inputStream.read(buffer, 0, length); + + assertEquals("Should read " + length + " bytes for '" + columnData[i] + "'", length, bytesRead); + + String readData = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8); + System.out.println("Read at offset " + currentPos + ": '" + readData + "'"); + assertEquals("Data mismatch", columnData[i], readData); + + currentPos = nextPos; + } + + inputStream.close(); + + System.out.println("SUCCESS: Small writes with getPos() tracking work correctly!\n"); + } + + @Test + public void testGetPosWithExactly78BytesBuffered() throws IOException { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n=== Testing getPos() with EXACTLY 78 bytes buffered (the bug size!) ==="); + + String testPath = TEST_ROOT + "/78-bytes-test.bin"; + + SeaweedOutputStream outputStream = new SeaweedOutputStream(filerClient, testPath); + + // Write some initial data + byte[] initial = new byte[1000]; + for (int i = 0; i < 1000; i++) { + initial[i] = (byte) i; + } + outputStream.write(initial); + outputStream.flush(); // Ensure this is flushed + + long posAfterFlush = outputStream.getPos(); + System.out.println("Position after 1000 bytes + flush: " + posAfterFlush); + assertEquals("Should be at position 1000 after flush", 1000, posAfterFlush); + + // Now write EXACTLY 78 bytes (the problematic buffer size in our bug) + byte[] problematicChunk = new byte[78]; + for (int i = 0; i < 78; i++) { + problematicChunk[i] = (byte) (i + 50); + } + outputStream.write(problematicChunk); + + // DO NOT FLUSH - this is the bug scenario! + // Parquet calls getPos() here while the 78 bytes are still buffered + long posWithBufferedData = outputStream.getPos(); + System.out.println("Position with 78 bytes BUFFERED (not flushed): " + posWithBufferedData); + + // This MUST return 1078, not 1000! + assertEquals("getPos() MUST include buffered data", 1078, posWithBufferedData); + + // Now close (which will flush) + outputStream.close(); + + // Verify actual file size + FilerProto.Entry entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(testPath), + SeaweedOutputStream.getFileName(testPath)); + long actualFileSize = SeaweedRead.fileSize(entry); + + System.out.println("Actual file size: " + actualFileSize); + assertEquals("File size must be 1078", 1078, actualFileSize); + + // Try to read at position 1000 for 78 bytes (what Parquet would try) + SeaweedInputStream inputStream = new SeaweedInputStream(filerClient, testPath, entry); + inputStream.seek(1000); + + byte[] readBuffer = new byte[78]; + int bytesRead = inputStream.read(readBuffer, 0, 78); + + System.out.println("Bytes read at position 1000: " + bytesRead); + assertEquals("Should successfully read 78 bytes at position 1000", 78, bytesRead); + + // Verify the data matches + for (int i = 0; i < 78; i++) { + assertEquals("Data mismatch at byte " + i, problematicChunk[i], readBuffer[i]); + } + + inputStream.close(); + + System.out.println("SUCCESS: getPos() correctly includes buffered data!\n"); + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/InputStreamComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/InputStreamComparisonTest.java new file mode 100644 index 000000000..0cfe2a53b --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/InputStreamComparisonTest.java @@ -0,0 +1,393 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Compare InputStream behavior between local disk and SeaweedFS + * to understand why Spark's ParquetFileReader fails with SeaweedFS. + */ +public class InputStreamComparisonTest extends SparkTestBase { + + private static class ReadOperation { + String source; + String operation; + long position; + int requestedBytes; + int returnedBytes; + boolean isEOF; + long timestamp; + + ReadOperation(String source, String operation, long position, int requestedBytes, + int returnedBytes, boolean isEOF) { + this.source = source; + this.operation = operation; + this.position = position; + this.requestedBytes = requestedBytes; + this.returnedBytes = returnedBytes; + this.isEOF = isEOF; + this.timestamp = System.nanoTime(); + } + + @Override + public String toString() { + return String.format("[%s] %s: pos=%d, requested=%d, returned=%d, EOF=%b", + source, operation, position, requestedBytes, returnedBytes, isEOF); + } + } + + private static class LoggingInputStream extends InputStream { + private final FSDataInputStream wrapped; + private final String source; + private final List<ReadOperation> operations; + private long position = 0; + + LoggingInputStream(FSDataInputStream wrapped, String source, List<ReadOperation> operations) { + this.wrapped = wrapped; + this.source = source; + this.operations = operations; + } + + @Override + public int read() throws IOException { + int result = wrapped.read(); + operations.add(new ReadOperation(source, "read()", position, 1, + result == -1 ? 0 : 1, result == -1)); + if (result != -1) + position++; + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int result = wrapped.read(b, off, len); + operations.add(new ReadOperation(source, "read(byte[])", position, len, + result == -1 ? 0 : result, result == -1)); + if (result > 0) + position += result; + return result; + } + + public int read(ByteBuffer buf) throws IOException { + int requested = buf.remaining(); + long startPos = position; + + // Use reflection to call read(ByteBuffer) if available + try { + java.lang.reflect.Method method = wrapped.getClass().getMethod("read", ByteBuffer.class); + int result = (int) method.invoke(wrapped, buf); + operations.add(new ReadOperation(source, "read(ByteBuffer)", startPos, requested, + result == -1 ? 0 : result, result == -1)); + if (result > 0) + position += result; + return result; + } catch (Exception e) { + // Fallback to byte array read + byte[] temp = new byte[requested]; + int result = wrapped.read(temp, 0, requested); + if (result > 0) { + buf.put(temp, 0, result); + } + operations.add(new ReadOperation(source, "read(ByteBuffer-fallback)", startPos, requested, + result == -1 ? 0 : result, result == -1)); + if (result > 0) + position += result; + return result; + } + } + + @Override + public long skip(long n) throws IOException { + long result = wrapped.skip(n); + operations.add(new ReadOperation(source, "skip()", position, (int) n, (int) result, false)); + position += result; + return result; + } + + @Override + public int available() throws IOException { + int result = wrapped.available(); + operations.add(new ReadOperation(source, "available()", position, 0, result, false)); + return result; + } + + @Override + public void close() throws IOException { + operations.add(new ReadOperation(source, "close()", position, 0, 0, false)); + wrapped.close(); + } + + public void seek(long pos) throws IOException { + wrapped.seek(pos); + operations.add(new ReadOperation(source, "seek()", position, 0, 0, false)); + position = pos; + } + + public long getPos() throws IOException { + long pos = wrapped.getPos(); + operations.add(new ReadOperation(source, "getPos()", position, 0, 0, false)); + return pos; + } + } + + @Before + public void setUp() throws IOException { + if (!TESTS_ENABLED) { + return; + } + super.setUpSpark(); + } + + @After + public void tearDown() throws IOException { + if (!TESTS_ENABLED) { + return; + } + super.tearDownSpark(); + } + + @Test + public void testCompareInputStreamBehavior() throws Exception { + skipIfTestsDisabled(); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ REAL-TIME INPUTSTREAM COMPARISON: LOCAL vs SEAWEEDFS ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + + // Write a Parquet file to both locations + System.out.println("\n1. Writing identical Parquet files..."); + + List<SparkSQLTest.Employee> employees = java.util.Arrays.asList( + new SparkSQLTest.Employee(1, "Alice", "Engineering", 100000), + new SparkSQLTest.Employee(2, "Bob", "Sales", 80000), + new SparkSQLTest.Employee(3, "Charlie", "Engineering", 120000), + new SparkSQLTest.Employee(4, "David", "Sales", 75000)); + + org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df = spark.createDataFrame(employees, + SparkSQLTest.Employee.class); + + String localPath = "file:///workspace/target/test-output/comparison-local"; + String seaweedPath = getTestPath("comparison-seaweed"); + + // Ensure directory exists + new java.io.File("/workspace/target/test-output").mkdirs(); + + df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(localPath); + df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(seaweedPath); + + System.out.println(" ✅ Files written"); + + // Find the actual parquet files + Configuration conf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(conf); + + conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT)); + FileSystem seaweedFs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s", + SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf); + + // Find parquet files + Path localFile = findParquetFile(localFs, new Path(localPath)); + Path seaweedFile = findParquetFile(seaweedFs, new Path(seaweedPath)); + + assertNotNull("Local parquet file not found", localFile); + assertNotNull("SeaweedFS parquet file not found", seaweedFile); + + System.out.println("\n2. Comparing file sizes..."); + long localSize = localFs.getFileStatus(localFile).getLen(); + long seaweedSize = seaweedFs.getFileStatus(seaweedFile).getLen(); + System.out.println(" Local: " + localSize + " bytes"); + System.out.println(" SeaweedFS: " + seaweedSize + " bytes"); + + // NOW: Open both streams with logging wrappers + List<ReadOperation> localOps = new ArrayList<>(); + List<ReadOperation> seaweedOps = new ArrayList<>(); + + System.out.println("\n3. Opening streams with logging wrappers..."); + + FSDataInputStream localStream = localFs.open(localFile); + FSDataInputStream seaweedStream = seaweedFs.open(seaweedFile); + + LoggingInputStream localLogging = new LoggingInputStream(localStream, "LOCAL", localOps); + LoggingInputStream seaweedLogging = new LoggingInputStream(seaweedStream, "SEAWEED", seaweedOps); + + System.out.println(" ✅ Streams opened"); + + // Create a dual-reader that calls both and compares + System.out.println("\n4. Performing synchronized read operations..."); + System.out.println(" (Each operation is called on BOTH streams and results are compared)\n"); + + int opCount = 0; + boolean mismatchFound = false; + + // Operation 1: Read 4 bytes (magic bytes) + opCount++; + System.out.println(" Op " + opCount + ": read(4 bytes) - Reading magic bytes"); + byte[] localBuf1 = new byte[4]; + byte[] seaweedBuf1 = new byte[4]; + int localRead1 = localLogging.read(localBuf1, 0, 4); + int seaweedRead1 = seaweedLogging.read(seaweedBuf1, 0, 4); + System.out.println(" LOCAL: returned " + localRead1 + " bytes: " + bytesToHex(localBuf1)); + System.out.println(" SEAWEED: returned " + seaweedRead1 + " bytes: " + bytesToHex(seaweedBuf1)); + if (localRead1 != seaweedRead1 || !java.util.Arrays.equals(localBuf1, seaweedBuf1)) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 2: Seek to end - 8 bytes (footer length + magic) + opCount++; + System.out.println("\n Op " + opCount + ": seek(fileSize - 8) - Jump to footer"); + localLogging.seek(localSize - 8); + seaweedLogging.seek(seaweedSize - 8); + System.out.println(" LOCAL: seeked to " + localLogging.getPos()); + System.out.println(" SEAWEED: seeked to " + seaweedLogging.getPos()); + if (localLogging.getPos() != seaweedLogging.getPos()) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 3: Read 8 bytes (footer length + magic) + opCount++; + System.out.println("\n Op " + opCount + ": read(8 bytes) - Reading footer length + magic"); + byte[] localBuf2 = new byte[8]; + byte[] seaweedBuf2 = new byte[8]; + int localRead2 = localLogging.read(localBuf2, 0, 8); + int seaweedRead2 = seaweedLogging.read(seaweedBuf2, 0, 8); + System.out.println(" LOCAL: returned " + localRead2 + " bytes: " + bytesToHex(localBuf2)); + System.out.println(" SEAWEED: returned " + seaweedRead2 + " bytes: " + bytesToHex(seaweedBuf2)); + if (localRead2 != seaweedRead2 || !java.util.Arrays.equals(localBuf2, seaweedBuf2)) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 4: Calculate footer offset and seek to it + int footerLength = java.nio.ByteBuffer.wrap(localBuf2, 0, 4).order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt(); + long footerOffset = localSize - 8 - footerLength; + + opCount++; + System.out.println("\n Op " + opCount + ": seek(" + footerOffset + ") - Jump to footer start"); + System.out.println(" Footer length: " + footerLength + " bytes"); + localLogging.seek(footerOffset); + seaweedLogging.seek(footerOffset); + System.out.println(" LOCAL: seeked to " + localLogging.getPos()); + System.out.println(" SEAWEED: seeked to " + seaweedLogging.getPos()); + if (localLogging.getPos() != seaweedLogging.getPos()) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 5: Read entire footer + opCount++; + System.out.println("\n Op " + opCount + ": read(" + footerLength + " bytes) - Reading footer metadata"); + byte[] localFooter = new byte[footerLength]; + byte[] seaweedFooter = new byte[footerLength]; + int localRead3 = localLogging.read(localFooter, 0, footerLength); + int seaweedRead3 = seaweedLogging.read(seaweedFooter, 0, footerLength); + System.out.println(" LOCAL: returned " + localRead3 + " bytes"); + System.out.println(" SEAWEED: returned " + seaweedRead3 + " bytes"); + if (localRead3 != seaweedRead3 || !java.util.Arrays.equals(localFooter, seaweedFooter)) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + // Show first difference + for (int i = 0; i < Math.min(localRead3, seaweedRead3); i++) { + if (localFooter[i] != seaweedFooter[i]) { + System.out.println(" First difference at byte " + i + ": LOCAL=" + + String.format("0x%02X", localFooter[i]) + " SEAWEED=" + + String.format("0x%02X", seaweedFooter[i])); + break; + } + } + } else { + System.out.println(" ✅ Match - Footer metadata is IDENTICAL"); + } + + // Operation 6: Try reading past EOF + opCount++; + System.out.println("\n Op " + opCount + ": read(100 bytes) - Try reading past EOF"); + byte[] localBuf3 = new byte[100]; + byte[] seaweedBuf3 = new byte[100]; + int localRead4 = localLogging.read(localBuf3, 0, 100); + int seaweedRead4 = seaweedLogging.read(seaweedBuf3, 0, 100); + System.out.println(" LOCAL: returned " + localRead4); + System.out.println(" SEAWEED: returned " + seaweedRead4); + if (localRead4 != seaweedRead4) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match - Both returned EOF"); + } + + localLogging.close(); + seaweedLogging.close(); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ COMPARISON SUMMARY ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + System.out.println(" Total operations: " + opCount); + System.out.println(" LOCAL operations: " + localOps.size()); + System.out.println(" SEAWEED operations: " + seaweedOps.size()); + + if (mismatchFound) { + System.out.println("\n ❌ MISMATCHES FOUND - Streams behave differently!"); + } else { + System.out.println("\n ✅ ALL OPERATIONS MATCH - Streams are identical!"); + } + + System.out.println("\n Detailed operation log:"); + System.out.println(" ----------------------"); + for (int i = 0; i < Math.max(localOps.size(), seaweedOps.size()); i++) { + if (i < localOps.size()) { + System.out.println(" " + localOps.get(i)); + } + if (i < seaweedOps.size()) { + System.out.println(" " + seaweedOps.get(i)); + } + } + + assertFalse("Streams should behave identically", mismatchFound); + } + + private String bytesToHex(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X ", b)); + } + return sb.toString().trim(); + } + + private Path findParquetFile(FileSystem fs, Path dir) throws IOException { + org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(dir); + for (org.apache.hadoop.fs.FileStatus file : files) { + if (file.getPath().getName().endsWith(".parquet") && + !file.getPath().getName().startsWith("_")) { + return file.getPath(); + } + } + return null; + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/OutputStreamComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/OutputStreamComparisonTest.java new file mode 100644 index 000000000..487cafc69 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/OutputStreamComparisonTest.java @@ -0,0 +1,466 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Compare OutputStream behavior between local disk and SeaweedFS + * to understand why Parquet files written to SeaweedFS have incorrect metadata. + */ +public class OutputStreamComparisonTest extends SparkTestBase { + + private static class WriteOperation { + String source; + String operation; + long positionBefore; + long positionAfter; + int bytesWritten; + long timestamp; + String details; + + WriteOperation(String source, String operation, long positionBefore, long positionAfter, + int bytesWritten, String details) { + this.source = source; + this.operation = operation; + this.positionBefore = positionBefore; + this.positionAfter = positionAfter; + this.bytesWritten = bytesWritten; + this.timestamp = System.nanoTime(); + this.details = details; + } + + @Override + public String toString() { + return String.format("[%s] %s: posBefore=%d, posAfter=%d, written=%d %s", + source, operation, positionBefore, positionAfter, bytesWritten, + details != null ? "(" + details + ")" : ""); + } + } + + private static class LoggingOutputStream extends OutputStream { + private final FSDataOutputStream wrapped; + private final String source; + private final List<WriteOperation> operations; + + LoggingOutputStream(FSDataOutputStream wrapped, String source, List<WriteOperation> operations) { + this.wrapped = wrapped; + this.source = source; + this.operations = operations; + } + + @Override + public void write(int b) throws IOException { + long posBefore = wrapped.getPos(); + wrapped.write(b); + long posAfter = wrapped.getPos(); + operations.add(new WriteOperation(source, "write(int)", posBefore, posAfter, 1, null)); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + long posBefore = wrapped.getPos(); + wrapped.write(b, off, len); + long posAfter = wrapped.getPos(); + operations.add(new WriteOperation(source, "write(byte[])", posBefore, posAfter, len, + "len=" + len)); + } + + @Override + public void flush() throws IOException { + long posBefore = wrapped.getPos(); + wrapped.flush(); + long posAfter = wrapped.getPos(); + operations.add(new WriteOperation(source, "flush()", posBefore, posAfter, 0, null)); + } + + @Override + public void close() throws IOException { + long posBefore = wrapped.getPos(); + wrapped.close(); + long posAfter = 0; // Can't call getPos() after close + operations.add(new WriteOperation(source, "close()", posBefore, posAfter, 0, + "finalPos=" + posBefore)); + } + + public long getPos() throws IOException { + long pos = wrapped.getPos(); + operations.add(new WriteOperation(source, "getPos()", pos, pos, 0, "returned=" + pos)); + return pos; + } + + public void hflush() throws IOException { + long posBefore = wrapped.getPos(); + wrapped.hflush(); + long posAfter = wrapped.getPos(); + operations.add(new WriteOperation(source, "hflush()", posBefore, posAfter, 0, null)); + } + + public void hsync() throws IOException { + long posBefore = wrapped.getPos(); + wrapped.hsync(); + long posAfter = wrapped.getPos(); + operations.add(new WriteOperation(source, "hsync()", posBefore, posAfter, 0, null)); + } + } + + private static final MessageType SCHEMA = MessageTypeParser.parseMessageType( + "message schema {" + + "required int32 id;" + + "required binary name;" + + "required int32 age;" + + "}" + ); + + @Before + public void setUp() throws IOException { + if (!TESTS_ENABLED) { + return; + } + super.setUpSpark(); + } + + @After + public void tearDown() throws IOException { + if (!TESTS_ENABLED) { + return; + } + super.tearDownSpark(); + } + + @Test + public void testCompareOutputStreamBehavior() throws Exception { + skipIfTestsDisabled(); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ REAL-TIME OUTPUTSTREAM COMPARISON: LOCAL vs SEAWEEDFS ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + + // Prepare file systems + Configuration conf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(conf); + + conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT)); + FileSystem seaweedFs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s", + SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf); + + // Prepare paths + new java.io.File("/workspace/target/test-output").mkdirs(); + Path localPath = new Path("file:///workspace/target/test-output/write-comparison-local.parquet"); + Path seaweedPath = new Path(getTestPath("write-comparison-seaweed.parquet")); + + // Delete if exists + localFs.delete(localPath, false); + seaweedFs.delete(seaweedPath, false); + + List<WriteOperation> localOps = new ArrayList<>(); + List<WriteOperation> seaweedOps = new ArrayList<>(); + + System.out.println("\n1. Writing Parquet files with synchronized operations...\n"); + + // Write using ParquetWriter with custom OutputStreams + GroupWriteSupport.setSchema(SCHEMA, conf); + + // Create data + SimpleGroupFactory groupFactory = new SimpleGroupFactory(SCHEMA); + List<Group> groups = new ArrayList<>(); + groups.add(groupFactory.newGroup().append("id", 1).append("name", "Alice").append("age", 30)); + groups.add(groupFactory.newGroup().append("id", 2).append("name", "Bob").append("age", 25)); + groups.add(groupFactory.newGroup().append("id", 3).append("name", "Charlie").append("age", 35)); + + // Write to local disk + System.out.println(" Writing to LOCAL DISK..."); + try (ParquetWriter<Group> localWriter = new ParquetWriter<>( + localPath, + new GroupWriteSupport(), + CompressionCodecName.SNAPPY, + 1024 * 1024, // Block size + 1024, // Page size + 1024, // Dictionary page size + true, // Enable dictionary + false, // Don't validate + ParquetWriter.DEFAULT_WRITER_VERSION, + conf)) { + for (Group group : groups) { + localWriter.write(group); + } + } + System.out.println(" ✅ Local write complete"); + + // Write to SeaweedFS + System.out.println("\n Writing to SEAWEEDFS..."); + try (ParquetWriter<Group> seaweedWriter = new ParquetWriter<>( + seaweedPath, + new GroupWriteSupport(), + CompressionCodecName.SNAPPY, + 1024 * 1024, // Block size + 1024, // Page size + 1024, // Dictionary page size + true, // Enable dictionary + false, // Don't validate + ParquetWriter.DEFAULT_WRITER_VERSION, + conf)) { + for (Group group : groups) { + seaweedWriter.write(group); + } + } + System.out.println(" ✅ SeaweedFS write complete"); + + // Compare file sizes + System.out.println("\n2. Comparing final file sizes..."); + long localSize = localFs.getFileStatus(localPath).getLen(); + long seaweedSize = seaweedFs.getFileStatus(seaweedPath).getLen(); + System.out.println(" LOCAL: " + localSize + " bytes"); + System.out.println(" SEAWEED: " + seaweedSize + " bytes"); + + if (localSize == seaweedSize) { + System.out.println(" ✅ File sizes MATCH"); + } else { + System.out.println(" ❌ File sizes DIFFER by " + Math.abs(localSize - seaweedSize) + " bytes"); + } + + // Now test reading both files + System.out.println("\n3. Testing if both files can be read by Spark..."); + + System.out.println("\n Reading LOCAL file:"); + try { + org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> localDf = + spark.read().parquet(localPath.toString()); + long localCount = localDf.count(); + System.out.println(" ✅ LOCAL read SUCCESS - " + localCount + " rows"); + localDf.show(); + } catch (Exception e) { + System.out.println(" ❌ LOCAL read FAILED: " + e.getMessage()); + e.printStackTrace(); + } + + System.out.println("\n Reading SEAWEEDFS file:"); + try { + org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> seaweedDf = + spark.read().parquet(seaweedPath.toString()); + long seaweedCount = seaweedDf.count(); + System.out.println(" ✅ SEAWEEDFS read SUCCESS - " + seaweedCount + " rows"); + seaweedDf.show(); + } catch (Exception e) { + System.out.println(" ❌ SEAWEEDFS read FAILED: " + e.getMessage()); + e.printStackTrace(); + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ COMPARISON COMPLETE ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + } + + @Test + public void testCompareRawOutputStreamOperations() throws Exception { + skipIfTestsDisabled(); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ RAW OUTPUTSTREAM COMPARISON: LOCAL vs SEAWEEDFS ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + + // Prepare file systems + Configuration conf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(conf); + + conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT)); + FileSystem seaweedFs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s", + SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf); + + // Prepare paths + new java.io.File("/workspace/target/test-output").mkdirs(); + Path localPath = new Path("file:///workspace/target/test-output/raw-comparison-local.dat"); + Path seaweedPath = new Path(getTestPath("raw-comparison-seaweed.dat")); + + // Delete if exists + localFs.delete(localPath, false); + seaweedFs.delete(seaweedPath, false); + + List<WriteOperation> localOps = new ArrayList<>(); + List<WriteOperation> seaweedOps = new ArrayList<>(); + + System.out.println("\n1. Performing synchronized write operations...\n"); + + // Open both streams + FSDataOutputStream localStream = localFs.create(localPath, true); + FSDataOutputStream seaweedStream = seaweedFs.create(seaweedPath, true); + + LoggingOutputStream localLogging = new LoggingOutputStream(localStream, "LOCAL", localOps); + LoggingOutputStream seaweedLogging = new LoggingOutputStream(seaweedStream, "SEAWEED", seaweedOps); + + int opCount = 0; + boolean mismatchFound = false; + + // Operation 1: Write 4 bytes (magic) + opCount++; + System.out.println(" Op " + opCount + ": write(4 bytes) - Writing magic bytes"); + byte[] magic = "PAR1".getBytes(); + localLogging.write(magic, 0, 4); + seaweedLogging.write(magic, 0, 4); + long localPos1 = localLogging.getPos(); + long seaweedPos1 = seaweedLogging.getPos(); + System.out.println(" LOCAL: getPos() = " + localPos1); + System.out.println(" SEAWEED: getPos() = " + seaweedPos1); + if (localPos1 != seaweedPos1) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 2: Write 100 bytes of data + opCount++; + System.out.println("\n Op " + opCount + ": write(100 bytes) - Writing data"); + byte[] data = new byte[100]; + for (int i = 0; i < 100; i++) { + data[i] = (byte) i; + } + localLogging.write(data, 0, 100); + seaweedLogging.write(data, 0, 100); + long localPos2 = localLogging.getPos(); + long seaweedPos2 = seaweedLogging.getPos(); + System.out.println(" LOCAL: getPos() = " + localPos2); + System.out.println(" SEAWEED: getPos() = " + seaweedPos2); + if (localPos2 != seaweedPos2) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 3: Flush + opCount++; + System.out.println("\n Op " + opCount + ": flush()"); + localLogging.flush(); + seaweedLogging.flush(); + long localPos3 = localLogging.getPos(); + long seaweedPos3 = seaweedLogging.getPos(); + System.out.println(" LOCAL: getPos() after flush = " + localPos3); + System.out.println(" SEAWEED: getPos() after flush = " + seaweedPos3); + if (localPos3 != seaweedPos3) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 4: Write more data + opCount++; + System.out.println("\n Op " + opCount + ": write(50 bytes) - Writing more data"); + byte[] moreData = new byte[50]; + for (int i = 0; i < 50; i++) { + moreData[i] = (byte) (i + 100); + } + localLogging.write(moreData, 0, 50); + seaweedLogging.write(moreData, 0, 50); + long localPos4 = localLogging.getPos(); + long seaweedPos4 = seaweedLogging.getPos(); + System.out.println(" LOCAL: getPos() = " + localPos4); + System.out.println(" SEAWEED: getPos() = " + seaweedPos4); + if (localPos4 != seaweedPos4) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 5: Write final bytes (simulating footer) + opCount++; + System.out.println("\n Op " + opCount + ": write(8 bytes) - Writing footer"); + byte[] footer = new byte[]{0x6B, 0x03, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31}; + localLogging.write(footer, 0, 8); + seaweedLogging.write(footer, 0, 8); + long localPos5 = localLogging.getPos(); + long seaweedPos5 = seaweedLogging.getPos(); + System.out.println(" LOCAL: getPos() = " + localPos5); + System.out.println(" SEAWEED: getPos() = " + seaweedPos5); + if (localPos5 != seaweedPos5) { + System.out.println(" ❌ MISMATCH!"); + mismatchFound = true; + } else { + System.out.println(" ✅ Match"); + } + + // Operation 6: Close + opCount++; + System.out.println("\n Op " + opCount + ": close()"); + System.out.println(" LOCAL: closing at position " + localPos5); + System.out.println(" SEAWEED: closing at position " + seaweedPos5); + localLogging.close(); + seaweedLogging.close(); + + // Check final file sizes + System.out.println("\n2. Comparing final file sizes..."); + long localSize = localFs.getFileStatus(localPath).getLen(); + long seaweedSize = seaweedFs.getFileStatus(seaweedPath).getLen(); + System.out.println(" LOCAL: " + localSize + " bytes"); + System.out.println(" SEAWEED: " + seaweedSize + " bytes"); + + if (localSize != seaweedSize) { + System.out.println(" ❌ File sizes DIFFER by " + Math.abs(localSize - seaweedSize) + " bytes"); + mismatchFound = true; + } else { + System.out.println(" ✅ File sizes MATCH"); + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ COMPARISON SUMMARY ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + System.out.println(" Total operations: " + opCount); + System.out.println(" LOCAL operations: " + localOps.size()); + System.out.println(" SEAWEED operations: " + seaweedOps.size()); + + if (mismatchFound) { + System.out.println("\n ❌ MISMATCHES FOUND - Streams behave differently!"); + } else { + System.out.println("\n ✅ ALL OPERATIONS MATCH - Streams are identical!"); + } + + System.out.println("\n Detailed operation log:"); + System.out.println(" ----------------------"); + int maxOps = Math.max(localOps.size(), seaweedOps.size()); + for (int i = 0; i < maxOps; i++) { + if (i < localOps.size()) { + System.out.println(" " + localOps.get(i)); + } + if (i < seaweedOps.size()) { + System.out.println(" " + seaweedOps.get(i)); + } + if (i < localOps.size() && i < seaweedOps.size()) { + WriteOperation localOp = localOps.get(i); + WriteOperation seaweedOp = seaweedOps.get(i); + if (localOp.positionAfter != seaweedOp.positionAfter) { + System.out.println(" ⚠️ Position mismatch: LOCAL=" + localOp.positionAfter + + " SEAWEED=" + seaweedOp.positionAfter); + } + } + } + + assertFalse("Streams should behave identically", mismatchFound); + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/ParquetOperationComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/ParquetOperationComparisonTest.java new file mode 100644 index 000000000..5636618ec --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/ParquetOperationComparisonTest.java @@ -0,0 +1,387 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Detailed comparison of InputStream/OutputStream operations between + * local filesystem and SeaweedFS during Parquet file writing. + * + * This test intercepts and logs every read/write/getPos operation to + * identify exactly where the behavior diverges. + */ +public class ParquetOperationComparisonTest extends SparkTestBase { + + private static final String SCHEMA_STRING = "message Employee { " + + " required int32 id; " + + " required binary name (UTF8); " + + " required int32 age; " + + "}"; + + private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(SCHEMA_STRING); + + // Track all operations for comparison + private static class OperationLog { + List<String> operations = new ArrayList<>(); + + void log(String op) { + operations.add(op); + System.out.println(" " + op); + } + + void print(String title) { + System.out.println("\n" + title + " (" + operations.size() + " operations):"); + for (int i = 0; i < operations.size(); i++) { + System.out.printf(" [%3d] %s\n", i, operations.get(i)); + } + } + + void compare(OperationLog other, String name1, String name2) { + System.out.println("\n=== COMPARISON: " + name1 + " vs " + name2 + " ==="); + + int maxLen = Math.max(operations.size(), other.operations.size()); + int differences = 0; + + for (int i = 0; i < maxLen; i++) { + String op1 = i < operations.size() ? operations.get(i) : "<missing>"; + String op2 = i < other.operations.size() ? other.operations.get(i) : "<missing>"; + + if (!op1.equals(op2)) { + differences++; + System.out.printf("[%3d] DIFF:\n", i); + System.out.println(" " + name1 + ": " + op1); + System.out.println(" " + name2 + ": " + op2); + } + } + + if (differences == 0) { + System.out.println("✅ Operations are IDENTICAL!"); + } else { + System.out.println("❌ Found " + differences + " differences"); + } + } + } + + // Wrapper for FSDataOutputStream that logs all operations + private static class LoggingOutputStream extends FSDataOutputStream { + private final FSDataOutputStream delegate; + private final OperationLog log; + private final String name; + + public LoggingOutputStream(FSDataOutputStream delegate, OperationLog log, String name) throws IOException { + super(delegate.getWrappedStream(), null); + this.delegate = delegate; + this.log = log; + this.name = name; + log.log(name + " CREATED"); + } + + @Override + public void write(int b) throws IOException { + log.log(String.format("write(byte) pos=%d", getPos())); + delegate.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + long posBefore = getPos(); + delegate.write(b, off, len); + long posAfter = getPos(); + log.log(String.format("write(%d bytes) pos %d→%d", len, posBefore, posAfter)); + } + + @Override + public long getPos() { + long pos = delegate.getPos(); + // Don't log getPos itself to avoid infinite recursion, but track it + return pos; + } + + @Override + public void flush() throws IOException { + log.log(String.format("flush() pos=%d", getPos())); + delegate.flush(); + } + + @Override + public void close() throws IOException { + log.log(String.format("close() pos=%d", getPos())); + delegate.close(); + } + + @Override + public void hflush() throws IOException { + log.log(String.format("hflush() pos=%d", getPos())); + delegate.hflush(); + } + + @Override + public void hsync() throws IOException { + log.log(String.format("hsync() pos=%d", getPos())); + delegate.hsync(); + } + } + + // Wrapper for FSDataInputStream that logs all operations + private static class LoggingInputStream extends FSDataInputStream { + private final OperationLog log; + private final String name; + + public LoggingInputStream(FSDataInputStream delegate, OperationLog log, String name) throws IOException { + super(delegate); + this.log = log; + this.name = name; + log.log(name + " CREATED"); + } + + @Override + public int read() throws IOException { + long posBefore = getPos(); + int result = super.read(); + log.log(String.format("read() pos %d→%d result=%d", posBefore, getPos(), result)); + return result; + } + + // Can't override read(byte[], int, int) as it's final in DataInputStream + // The logging will happen through read(ByteBuffer) which is what Parquet uses + + @Override + public int read(ByteBuffer buf) throws IOException { + long posBefore = getPos(); + int result = super.read(buf); + log.log(String.format("read(ByteBuffer %d) pos %d→%d result=%d", buf.remaining(), posBefore, getPos(), + result)); + return result; + } + + @Override + public void seek(long pos) throws IOException { + long posBefore = getPos(); + super.seek(pos); + log.log(String.format("seek(%d) pos %d→%d", pos, posBefore, getPos())); + } + + @Override + public void close() throws IOException { + log.log(String.format("close() pos=%d", getPos())); + super.close(); + } + } + + @Test + public void testCompareWriteOperations() throws Exception { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ PARQUET WRITE OPERATION COMPARISON TEST ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝\n"); + + // Setup filesystems + Configuration localConf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(localConf); + + Configuration seaweedConf = new Configuration(); + seaweedConf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + seaweedConf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + seaweedConf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT); + FileSystem seaweedFs = FileSystem.get( + java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT), + seaweedConf); + + Path localPath = new Path("/tmp/test-local-ops-" + System.currentTimeMillis() + ".parquet"); + Path seaweedPath = new Path("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT + + "/test-spark/ops-test.parquet"); + + OperationLog localLog = new OperationLog(); + OperationLog seaweedLog = new OperationLog(); + + // Write to local filesystem with logging + System.out.println("=== Writing to LOCAL filesystem ==="); + writeParquetWithLogging(localFs, localPath, localConf, localLog, "LOCAL"); + + System.out.println("\n=== Writing to SEAWEEDFS ==="); + writeParquetWithLogging(seaweedFs, seaweedPath, seaweedConf, seaweedLog, "SEAWEED"); + + // Print logs + localLog.print("LOCAL OPERATIONS"); + seaweedLog.print("SEAWEEDFS OPERATIONS"); + + // Compare + localLog.compare(seaweedLog, "LOCAL", "SEAWEEDFS"); + + // Cleanup + localFs.delete(localPath, false); + seaweedFs.delete(seaweedPath, false); + + localFs.close(); + seaweedFs.close(); + + System.out.println("\n=== Test Complete ==="); + } + + @Test + public void testCompareReadOperations() throws Exception { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ PARQUET READ OPERATION COMPARISON TEST ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝\n"); + + // Setup filesystems + Configuration localConf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(localConf); + + Configuration seaweedConf = new Configuration(); + seaweedConf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + seaweedConf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + seaweedConf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT); + FileSystem seaweedFs = FileSystem.get( + java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT), + seaweedConf); + + Path localPath = new Path("/tmp/test-local-read-" + System.currentTimeMillis() + ".parquet"); + Path seaweedPath = new Path("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT + + "/test-spark/read-test.parquet"); + + // First write files without logging + System.out.println("=== Writing test files ==="); + writeParquetSimple(localFs, localPath, localConf); + writeParquetSimple(seaweedFs, seaweedPath, seaweedConf); + System.out.println("✅ Files written"); + + OperationLog localLog = new OperationLog(); + OperationLog seaweedLog = new OperationLog(); + + // Read from local filesystem with logging + System.out.println("\n=== Reading from LOCAL filesystem ==="); + readParquetWithLogging(localFs, localPath, localLog, "LOCAL"); + + System.out.println("\n=== Reading from SEAWEEDFS ==="); + readParquetWithLogging(seaweedFs, seaweedPath, seaweedLog, "SEAWEED"); + + // Print logs + localLog.print("LOCAL READ OPERATIONS"); + seaweedLog.print("SEAWEEDFS READ OPERATIONS"); + + // Compare + localLog.compare(seaweedLog, "LOCAL", "SEAWEEDFS"); + + // Cleanup + localFs.delete(localPath, false); + seaweedFs.delete(seaweedPath, false); + + localFs.close(); + seaweedFs.close(); + + System.out.println("\n=== Test Complete ==="); + } + + private void writeParquetWithLogging(FileSystem fs, Path path, Configuration conf, + OperationLog log, String name) throws IOException { + // We can't easily intercept ParquetWriter's internal stream usage, + // but we can log the file operations + log.log(name + " START WRITE"); + + GroupWriteSupport.setSchema(SCHEMA, conf); + + try (ParquetWriter<Group> writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path) + .withConf(conf) + .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + + log.log("WRITE ROW 1"); + Group group1 = factory.newGroup() + .append("id", 1) + .append("name", "Alice") + .append("age", 30); + writer.write(group1); + + log.log("WRITE ROW 2"); + Group group2 = factory.newGroup() + .append("id", 2) + .append("name", "Bob") + .append("age", 25); + writer.write(group2); + + log.log("WRITE ROW 3"); + Group group3 = factory.newGroup() + .append("id", 3) + .append("name", "Charlie") + .append("age", 35); + writer.write(group3); + + log.log("CLOSE WRITER"); + } + + // Check final file size + org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(path); + log.log(String.format("FINAL FILE SIZE: %d bytes", status.getLen())); + } + + private void writeParquetSimple(FileSystem fs, Path path, Configuration conf) throws IOException { + GroupWriteSupport.setSchema(SCHEMA, conf); + + try (ParquetWriter<Group> writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path) + .withConf(conf) + .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + + writer.write(factory.newGroup().append("id", 1).append("name", "Alice").append("age", 30)); + writer.write(factory.newGroup().append("id", 2).append("name", "Bob").append("age", 25)); + writer.write(factory.newGroup().append("id", 3).append("name", "Charlie").append("age", 35)); + } + } + + private void readParquetWithLogging(FileSystem fs, Path path, OperationLog log, String name) throws IOException { + log.log(name + " START READ"); + + // Read file in chunks to see the pattern + try (FSDataInputStream in = fs.open(path)) { + byte[] buffer = new byte[256]; + int totalRead = 0; + int chunkNum = 0; + + while (true) { + long posBefore = in.getPos(); + int bytesRead = in.read(buffer); + + if (bytesRead == -1) { + log.log(String.format("READ CHUNK %d: EOF at pos=%d", chunkNum, posBefore)); + break; + } + + totalRead += bytesRead; + log.log(String.format("READ CHUNK %d: %d bytes at pos %d→%d", + chunkNum, bytesRead, posBefore, in.getPos())); + chunkNum++; + } + + log.log(String.format("TOTAL READ: %d bytes in %d chunks", totalRead, chunkNum)); + } + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/RenameChunkVerificationTest.java b/test/java/spark/src/test/java/seaweed/spark/RenameChunkVerificationTest.java new file mode 100644 index 000000000..0002c26b1 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/RenameChunkVerificationTest.java @@ -0,0 +1,286 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Test to verify if file chunks are preserved during rename operations. + * This could explain why Parquet files become unreadable after Spark's commit. + */ +public class RenameChunkVerificationTest extends SparkTestBase { + + @Before + public void setUp() throws IOException { + if (!TESTS_ENABLED) { + return; + } + super.setUpSpark(); + } + + @After + public void tearDown() throws IOException { + if (!TESTS_ENABLED) { + return; + } + super.tearDownSpark(); + } + + @Test + public void testSparkWriteAndRenamePreservesChunks() throws Exception { + skipIfTestsDisabled(); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ TESTING: Chunk Preservation During Spark Write & Rename ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + + // Write using Spark (which uses rename for commit) + List<SparkSQLTest.Employee> employees = Arrays.asList( + new SparkSQLTest.Employee(1, "Alice", "Engineering", 100000), + new SparkSQLTest.Employee(2, "Bob", "Sales", 80000), + new SparkSQLTest.Employee(3, "Charlie", "Engineering", 120000), + new SparkSQLTest.Employee(4, "David", "Sales", 75000)); + + org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df = + spark.createDataFrame(employees, SparkSQLTest.Employee.class); + + String tablePath = getTestPath("chunk-test"); + + System.out.println("\n1. Writing Parquet file using Spark..."); + df.write().mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(tablePath); + System.out.println(" ✅ Write complete"); + + // Get file system + Configuration conf = new Configuration(); + conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT)); + FileSystem fs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s", + SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf); + + // Find the parquet file + Path parquetFile = null; + org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(new Path(tablePath)); + for (org.apache.hadoop.fs.FileStatus file : files) { + if (file.getPath().getName().endsWith(".parquet") && + !file.getPath().getName().startsWith("_")) { + parquetFile = file.getPath(); + break; + } + } + + assertNotNull("Parquet file not found", parquetFile); + + System.out.println("\n2. Checking file metadata after Spark write..."); + org.apache.hadoop.fs.FileStatus fileStatus = fs.getFileStatus(parquetFile); + long fileSize = fileStatus.getLen(); + System.out.println(" File: " + parquetFile.getName()); + System.out.println(" Size: " + fileSize + " bytes"); + + // Try to read the file + System.out.println("\n3. Attempting to read file with Spark..."); + try { + org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readDf = + spark.read().parquet(tablePath); + long count = readDf.count(); + System.out.println(" ✅ Read SUCCESS - " + count + " rows"); + readDf.show(); + } catch (Exception e) { + System.out.println(" ❌ Read FAILED: " + e.getMessage()); + System.out.println("\n Error details:"); + e.printStackTrace(); + + // This is expected to fail - let's investigate why + System.out.println("\n4. Investigating chunk availability..."); + + // Try to read the raw bytes + System.out.println("\n Attempting to read raw bytes..."); + try (org.apache.hadoop.fs.FSDataInputStream in = fs.open(parquetFile)) { + byte[] header = new byte[4]; + int read = in.read(header); + System.out.println(" Read " + read + " bytes"); + System.out.println(" Header: " + bytesToHex(header)); + + if (read == 4 && Arrays.equals(header, "PAR1".getBytes())) { + System.out.println(" ✅ Magic bytes are correct (PAR1)"); + } else { + System.out.println(" ❌ Magic bytes are WRONG!"); + } + + // Try to read footer + in.seek(fileSize - 8); + byte[] footer = new byte[8]; + read = in.read(footer); + System.out.println("\n Footer (last 8 bytes): " + bytesToHex(footer)); + + // Try to read entire file + in.seek(0); + byte[] allBytes = new byte[(int)fileSize]; + int totalRead = 0; + while (totalRead < fileSize) { + int bytesRead = in.read(allBytes, totalRead, (int)(fileSize - totalRead)); + if (bytesRead == -1) { + System.out.println(" ❌ Premature EOF at byte " + totalRead + " (expected " + fileSize + ")"); + break; + } + totalRead += bytesRead; + } + + if (totalRead == fileSize) { + System.out.println(" ✅ Successfully read all " + totalRead + " bytes"); + } else { + System.out.println(" ❌ Only read " + totalRead + " of " + fileSize + " bytes"); + } + + } catch (Exception readEx) { + System.out.println(" ❌ Raw read failed: " + readEx.getMessage()); + readEx.printStackTrace(); + } + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ TEST COMPLETE ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + } + + @Test + public void testManualRenamePreservesChunks() throws Exception { + skipIfTestsDisabled(); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ TESTING: Manual Rename Chunk Preservation ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + + // Get file system + Configuration conf = new Configuration(); + conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + conf.set("fs.seaweed.filer.port", String.valueOf(SEAWEEDFS_PORT)); + FileSystem fs = FileSystem.get(URI.create(String.format("seaweedfs://%s:%s", + SEAWEEDFS_HOST, SEAWEEDFS_PORT)), conf); + + Path sourcePath = new Path(getTestPath("rename-source.dat")); + Path destPath = new Path(getTestPath("rename-dest.dat")); + + // Clean up + fs.delete(sourcePath, false); + fs.delete(destPath, false); + + System.out.println("\n1. Creating test file..."); + byte[] testData = new byte[1260]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte)(i % 256); + } + + try (org.apache.hadoop.fs.FSDataOutputStream out = fs.create(sourcePath, true)) { + out.write(testData); + } + System.out.println(" ✅ Created source file: " + sourcePath); + + // Check source file + System.out.println("\n2. Verifying source file..."); + org.apache.hadoop.fs.FileStatus sourceStatus = fs.getFileStatus(sourcePath); + System.out.println(" Size: " + sourceStatus.getLen() + " bytes"); + + // Read source file + try (org.apache.hadoop.fs.FSDataInputStream in = fs.open(sourcePath)) { + byte[] readData = new byte[1260]; + int totalRead = 0; + while (totalRead < 1260) { + int bytesRead = in.read(readData, totalRead, 1260 - totalRead); + if (bytesRead == -1) break; + totalRead += bytesRead; + } + System.out.println(" Read: " + totalRead + " bytes"); + + if (Arrays.equals(testData, readData)) { + System.out.println(" ✅ Source file data is correct"); + } else { + System.out.println(" ❌ Source file data is CORRUPTED"); + } + } + + // Perform rename + System.out.println("\n3. Renaming file..."); + boolean renamed = fs.rename(sourcePath, destPath); + System.out.println(" Rename result: " + renamed); + + if (!renamed) { + System.out.println(" ❌ Rename FAILED"); + return; + } + + // Check destination file + System.out.println("\n4. Verifying destination file..."); + org.apache.hadoop.fs.FileStatus destStatus = fs.getFileStatus(destPath); + System.out.println(" Size: " + destStatus.getLen() + " bytes"); + + if (destStatus.getLen() != sourceStatus.getLen()) { + System.out.println(" ❌ File size CHANGED during rename!"); + System.out.println(" Source: " + sourceStatus.getLen()); + System.out.println(" Dest: " + destStatus.getLen()); + } else { + System.out.println(" ✅ File size preserved"); + } + + // Read destination file + try (org.apache.hadoop.fs.FSDataInputStream in = fs.open(destPath)) { + byte[] readData = new byte[1260]; + int totalRead = 0; + while (totalRead < 1260) { + int bytesRead = in.read(readData, totalRead, 1260 - totalRead); + if (bytesRead == -1) { + System.out.println(" ❌ Premature EOF at byte " + totalRead); + break; + } + totalRead += bytesRead; + } + System.out.println(" Read: " + totalRead + " bytes"); + + if (totalRead == 1260 && Arrays.equals(testData, readData)) { + System.out.println(" ✅ Destination file data is CORRECT"); + } else { + System.out.println(" ❌ Destination file data is CORRUPTED or INCOMPLETE"); + + // Show first difference + for (int i = 0; i < Math.min(totalRead, 1260); i++) { + if (testData[i] != readData[i]) { + System.out.println(" First difference at byte " + i); + System.out.println(" Expected: " + String.format("0x%02X", testData[i])); + System.out.println(" Got: " + String.format("0x%02X", readData[i])); + break; + } + } + } + } catch (Exception e) { + System.out.println(" ❌ Read FAILED: " + e.getMessage()); + e.printStackTrace(); + } + + // Clean up + fs.delete(destPath, false); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ TEST COMPLETE ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + } + + private String bytesToHex(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X ", b)); + } + return sb.toString().trim(); + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SimpleOneColumnTest.java b/test/java/spark/src/test/java/seaweed/spark/SimpleOneColumnTest.java new file mode 100644 index 000000000..092039042 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SimpleOneColumnTest.java @@ -0,0 +1,140 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Simplified test with only one column to isolate the EOF issue. + */ +public class SimpleOneColumnTest extends SparkTestBase { + + @Test + public void testSingleIntegerColumn() { + skipIfTestsDisabled(); + + // Clean up any previous test data + String tablePath = getTestPath("simple_data"); + try { + spark.read().parquet(tablePath); + // If we get here, path exists, so delete it + org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get( + new java.net.URI(tablePath), + spark.sparkContext().hadoopConfiguration()); + fs.delete(new org.apache.hadoop.fs.Path(tablePath), true); + } catch (Exception e) { + // Path doesn't exist, which is fine + } + + // Create simple data with just one integer column + List<SimpleData> data = Arrays.asList( + new SimpleData(1), + new SimpleData(2), + new SimpleData(3), + new SimpleData(4)); + + Dataset<Row> df = spark.createDataFrame(data, SimpleData.class); + + // Write to SeaweedFS + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + // Read back + Dataset<Row> readDf = spark.read().parquet(tablePath); + + // Simple count + assertEquals(4, readDf.count()); + + // Create view and query + readDf.createOrReplaceTempView("simple"); + + // Simple WHERE query + Dataset<Row> filtered = spark.sql("SELECT value FROM simple WHERE value > 2"); + assertEquals(2, filtered.count()); + + // Verify values + List<Row> results = filtered.collectAsList(); + assertTrue(results.stream().anyMatch(r -> r.getInt(0) == 3)); + assertTrue(results.stream().anyMatch(r -> r.getInt(0) == 4)); + } + + @Test + public void testSingleStringColumn() { + skipIfTestsDisabled(); + + // Create simple data with just one string column + List<StringData> data = Arrays.asList( + new StringData("Alice"), + new StringData("Bob"), + new StringData("Charlie"), + new StringData("David")); + + Dataset<Row> df = spark.createDataFrame(data, StringData.class); + + // Write to SeaweedFS + String tablePath = getTestPath("string_data"); + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + // Read back + Dataset<Row> readDf = spark.read().parquet(tablePath); + + // Simple count + assertEquals(4, readDf.count()); + + // Create view and query + readDf.createOrReplaceTempView("strings"); + + // Simple WHERE query + Dataset<Row> filtered = spark.sql("SELECT name FROM strings WHERE name LIKE 'A%'"); + assertEquals(1, filtered.count()); + + // Verify value + List<Row> results = filtered.collectAsList(); + assertEquals("Alice", results.get(0).getString(0)); + } + + // Test data classes + public static class SimpleData implements java.io.Serializable { + private int value; + + public SimpleData() { + } + + public SimpleData(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } + } + + public static class StringData implements java.io.Serializable { + private String name; + + public StringData() { + } + + public StringData(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkDataFrameWriteComparisonTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkDataFrameWriteComparisonTest.java new file mode 100644 index 000000000..d3fc1555c --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkDataFrameWriteComparisonTest.java @@ -0,0 +1,363 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Compare Spark DataFrame.write().parquet() operations between + * local filesystem and SeaweedFS to identify the exact difference + * that causes the 78-byte EOF error. + */ +public class SparkDataFrameWriteComparisonTest extends SparkTestBase { + + private static class OperationLog { + List<String> operations = new ArrayList<>(); + + synchronized void log(String op) { + operations.add(op); + System.out.println(" " + op); + } + + void print(String title) { + System.out.println("\n" + title + " (" + operations.size() + " operations):"); + for (int i = 0; i < operations.size(); i++) { + System.out.printf(" [%3d] %s\n", i, operations.get(i)); + } + } + + void compare(OperationLog other, String name1, String name2) { + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ COMPARISON: " + name1 + " vs " + name2); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + + int maxLen = Math.max(operations.size(), other.operations.size()); + int differences = 0; + + for (int i = 0; i < maxLen; i++) { + String op1 = i < operations.size() ? operations.get(i) : "<missing>"; + String op2 = i < other.operations.size() ? other.operations.get(i) : "<missing>"; + + // Normalize operation strings for comparison (remove file-specific parts) + String normalized1 = normalizeOp(op1); + String normalized2 = normalizeOp(op2); + + if (!normalized1.equals(normalized2)) { + differences++; + System.out.printf("\n[%3d] DIFFERENCE:\n", i); + System.out.println(" " + name1 + ": " + op1); + System.out.println(" " + name2 + ": " + op2); + } + } + + System.out.println("\n" + "=".repeat(64)); + if (differences == 0) { + System.out.println("✅ Operations are IDENTICAL!"); + } else { + System.out.println("❌ Found " + differences + " differences"); + } + System.out.println("=".repeat(64)); + } + + private String normalizeOp(String op) { + // Remove file-specific identifiers for comparison + return op.replaceAll("part-[0-9a-f-]+", "part-XXXXX") + .replaceAll("attempt_[0-9]+", "attempt_XXXXX") + .replaceAll("/tmp/[^/]+", "/tmp/XXXXX") + .replaceAll("test-local-[0-9]+", "test-local-XXXXX"); + } + } + + // Custom FileSystem wrapper that logs all operations + private static class LoggingFileSystem extends FilterFileSystem { + private final OperationLog log; + private final String name; + + public LoggingFileSystem(FileSystem fs, OperationLog log, String name) { + this.fs = fs; + this.log = log; + this.name = name; + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, + int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + log.log(String.format("%s CREATE: %s (bufferSize=%d)", name, f.getName(), bufferSize)); + FSDataOutputStream out = fs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress); + return new LoggingOutputStream(out, log, name, f.getName()); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + log.log(String.format("%s APPEND: %s (bufferSize=%d)", name, f.getName(), bufferSize)); + FSDataOutputStream out = fs.append(f, bufferSize, progress); + return new LoggingOutputStream(out, log, name, f.getName()); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + log.log(String.format("%s RENAME: %s → %s", name, src.getName(), dst.getName())); + return fs.rename(src, dst); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + log.log(String.format("%s DELETE: %s (recursive=%s)", name, f.getName(), recursive)); + return fs.delete(f, recursive); + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + FileStatus[] result = fs.listStatus(f); + log.log(String.format("%s LISTSTATUS: %s (%d files)", name, f.getName(), result.length)); + return result; + } + + @Override + public void setWorkingDirectory(Path new_dir) { + fs.setWorkingDirectory(new_dir); + } + + @Override + public Path getWorkingDirectory() { + return fs.getWorkingDirectory(); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + log.log(String.format("%s MKDIRS: %s", name, f.getName())); + return fs.mkdirs(f, permission); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + FileStatus status = fs.getFileStatus(f); + log.log(String.format("%s GETFILESTATUS: %s (size=%d)", name, f.getName(), status.getLen())); + return status; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + log.log(String.format("%s OPEN: %s (bufferSize=%d)", name, f.getName(), bufferSize)); + return fs.open(f, bufferSize); + } + } + + private static class LoggingOutputStream extends FSDataOutputStream { + private final FSDataOutputStream delegate; + private final OperationLog log; + private final String name; + private final String filename; + private long writeCount = 0; + + public LoggingOutputStream(FSDataOutputStream delegate, OperationLog log, String name, String filename) throws IOException { + super(delegate.getWrappedStream(), null); + this.delegate = delegate; + this.log = log; + this.name = name; + this.filename = filename; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + writeCount++; + long posBefore = getPos(); + delegate.write(b, off, len); + long posAfter = getPos(); + + // Log significant writes and the last few writes (potential footer) + if (len >= 100 || writeCount <= 5 || (writeCount % 100 == 0)) { + log.log(String.format("%s WRITE #%d: %d bytes, pos %d→%d [%s]", + name, writeCount, len, posBefore, posAfter, filename)); + } + } + + @Override + public long getPos() { + long pos = delegate.getPos(); + return pos; + } + + @Override + public void flush() throws IOException { + log.log(String.format("%s FLUSH: pos=%d [%s]", name, getPos(), filename)); + delegate.flush(); + } + + @Override + public void close() throws IOException { + log.log(String.format("%s CLOSE: pos=%d, totalWrites=%d [%s]", + name, getPos(), writeCount, filename)); + delegate.close(); + } + + @Override + public void hflush() throws IOException { + log.log(String.format("%s HFLUSH: pos=%d [%s]", name, getPos(), filename)); + delegate.hflush(); + } + + @Override + public void hsync() throws IOException { + log.log(String.format("%s HSYNC: pos=%d [%s]", name, getPos(), filename)); + delegate.hsync(); + } + } + + @Test + public void testCompareSparkDataFrameWrite() throws Exception { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ SPARK DATAFRAME.WRITE() OPERATION COMPARISON TEST ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝\n"); + + // Create test data (4 rows - this is what causes the error) + List<Employee> employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000), + new Employee(3, "Charlie", "Engineering", 120000), + new Employee(4, "David", "Sales", 75000) + ); + + Dataset<Row> df = spark.createDataFrame(employees, Employee.class); + + OperationLog localLog = new OperationLog(); + OperationLog seaweedLog = new OperationLog(); + + // Test 1: Write to local filesystem with logging + System.out.println("=== Writing to LOCAL filesystem with Spark ==="); + String localPath = "/tmp/spark-local-test-" + System.currentTimeMillis(); + + try { + // Configure Spark to use our logging filesystem for local writes + Configuration localConf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(localConf); + LoggingFileSystem loggingLocalFs = new LoggingFileSystem(localFs, localLog, "LOCAL"); + + // Write using Spark + df.write().mode(SaveMode.Overwrite).parquet("file://" + localPath); + + System.out.println("✅ Local write completed"); + + // Check final file + FileStatus[] files = localFs.listStatus(new Path(localPath)); + for (FileStatus file : files) { + if (file.getPath().getName().endsWith(".parquet")) { + localLog.log(String.format("LOCAL FINAL FILE: %s (%d bytes)", + file.getPath().getName(), file.getLen())); + } + } + + } catch (Exception e) { + System.out.println("❌ Local write failed: " + e.getMessage()); + e.printStackTrace(); + } + + // Test 2: Write to SeaweedFS with logging + System.out.println("\n=== Writing to SEAWEEDFS with Spark ==="); + String seaweedPath = getTestPath("spark-seaweed-test"); + + try { + df.write().mode(SaveMode.Overwrite).parquet(seaweedPath); + + System.out.println("✅ SeaweedFS write completed"); + + // Check final file + Configuration seaweedConf = new Configuration(); + seaweedConf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + seaweedConf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + seaweedConf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT); + FileSystem seaweedFs = FileSystem.get( + java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT), + seaweedConf); + + FileStatus[] files = seaweedFs.listStatus(new Path(seaweedPath)); + for (FileStatus file : files) { + if (file.getPath().getName().endsWith(".parquet")) { + seaweedLog.log(String.format("SEAWEED FINAL FILE: %s (%d bytes)", + file.getPath().getName(), file.getLen())); + } + } + + } catch (Exception e) { + System.out.println("❌ SeaweedFS write failed: " + e.getMessage()); + if (e.getMessage() != null && e.getMessage().contains("bytes left")) { + System.out.println("🎯 This is the 78-byte EOF error during WRITE!"); + } + e.printStackTrace(); + } + + // Test 3: Try reading both + System.out.println("\n=== Reading LOCAL file ==="); + try { + Dataset<Row> localDf = spark.read().parquet("file://" + localPath); + long count = localDf.count(); + System.out.println("✅ Local read successful: " + count + " rows"); + } catch (Exception e) { + System.out.println("❌ Local read failed: " + e.getMessage()); + } + + System.out.println("\n=== Reading SEAWEEDFS file ==="); + try { + Dataset<Row> seaweedDf = spark.read().parquet(seaweedPath); + long count = seaweedDf.count(); + System.out.println("✅ SeaweedFS read successful: " + count + " rows"); + } catch (Exception e) { + System.out.println("❌ SeaweedFS read failed: " + e.getMessage()); + if (e.getMessage() != null && e.getMessage().contains("bytes left")) { + System.out.println("🎯 This is the 78-byte EOF error during READ!"); + } + } + + // Print operation logs + localLog.print("LOCAL OPERATIONS"); + seaweedLog.print("SEAWEEDFS OPERATIONS"); + + // Compare + localLog.compare(seaweedLog, "LOCAL", "SEAWEEDFS"); + + System.out.println("\n=== Test Complete ==="); + } + + // Employee class for test data + public static class Employee implements java.io.Serializable { + private int id; + private String name; + private String department; + private int salary; + + public Employee() {} + + public Employee(int id, String name, String department, int salary) { + this.id = id; + this.name = name; + this.department = department; + this.salary = salary; + } + + public int getId() { return id; } + public void setId(int id) { this.id = id; } + public String getName() { return name; } + public void setName(String name) { this.name = name; } + public String getDepartment() { return department; } + public void setDepartment(String department) { this.department = department; } + public int getSalary() { return salary; } + public void setSalary(int salary) { this.salary = salary; } + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkLocalFileSystemTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkLocalFileSystemTest.java new file mode 100644 index 000000000..1d1881563 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkLocalFileSystemTest.java @@ -0,0 +1,177 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Test Spark DataFrame.write() with LOCAL filesystem to see if the issue is SeaweedFS-specific. + * This is the CRITICAL test to determine if the 78-byte error occurs with local files. + */ +public class SparkLocalFileSystemTest extends SparkTestBase { + + private String localTestDir; + + @Before + public void setUp() throws Exception { + super.setUpSpark(); + localTestDir = "/tmp/spark-local-test-" + System.currentTimeMillis(); + new File(localTestDir).mkdirs(); + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ CRITICAL TEST: Spark DataFrame.write() to LOCAL filesystem ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + System.out.println("Local test directory: " + localTestDir); + } + + @After + public void tearDown() throws Exception { + // Clean up + if (localTestDir != null) { + deleteDirectory(new File(localTestDir)); + } + super.tearDownSpark(); + } + + @Test + public void testSparkWriteToLocalFilesystem() { + System.out.println("\n=== TEST: Write Parquet to Local Filesystem ==="); + + // Create test data (same as SparkSQLTest) + List<Employee> employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000), + new Employee(3, "Charlie", "Engineering", 120000), + new Employee(4, "David", "Sales", 75000)); + + Dataset<Row> df = spark.createDataFrame(employees, Employee.class); + + // Write to LOCAL filesystem using file:// protocol + String localPath = "file://" + localTestDir + "/employees"; + System.out.println("Writing to: " + localPath); + + try { + df.write().mode(SaveMode.Overwrite).parquet(localPath); + System.out.println("✅ Write completed successfully!"); + } catch (Exception e) { + System.err.println("❌ Write FAILED: " + e.getMessage()); + e.printStackTrace(); + fail("Write to local filesystem failed: " + e.getMessage()); + } + + // Now try to READ back + System.out.println("\n=== TEST: Read Parquet from Local Filesystem ==="); + System.out.println("Reading from: " + localPath); + + try { + Dataset<Row> employeesDf = spark.read().parquet(localPath); + employeesDf.createOrReplaceTempView("employees"); + + // Run SQL query + Dataset<Row> engineeringEmployees = spark.sql( + "SELECT name, salary FROM employees WHERE department = 'Engineering'"); + + long count = engineeringEmployees.count(); + System.out.println("✅ Read completed successfully! Found " + count + " engineering employees"); + + assertEquals("Should find 2 engineering employees", 2, count); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ ✅ SUCCESS! Local filesystem works perfectly! ║"); + System.out.println("║ This proves the issue is SeaweedFS-specific! ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + + } catch (Exception e) { + if (e.getMessage() != null && e.getMessage().contains("EOFException") && e.getMessage().contains("78 bytes")) { + System.err.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.err.println("║ ❌ CRITICAL: 78-byte error ALSO occurs with local files! ║"); + System.err.println("║ This proves the issue is NOT SeaweedFS-specific! ║"); + System.err.println("║ The issue is in Spark itself or our test setup! ║"); + System.err.println("╚══════════════════════════════════════════════════════════════╝"); + } + System.err.println("❌ Read FAILED: " + e.getMessage()); + e.printStackTrace(); + fail("Read from local filesystem failed: " + e.getMessage()); + } + } + + @Test + public void testSparkWriteReadMultipleTimes() { + System.out.println("\n=== TEST: Multiple Write/Read Cycles ==="); + + for (int i = 1; i <= 3; i++) { + System.out.println("\n--- Cycle " + i + " ---"); + + List<Employee> employees = Arrays.asList( + new Employee(i * 10 + 1, "Person" + (i * 10 + 1), "Dept" + i, 50000 + i * 10000), + new Employee(i * 10 + 2, "Person" + (i * 10 + 2), "Dept" + i, 60000 + i * 10000)); + + Dataset<Row> df = spark.createDataFrame(employees, Employee.class); + String localPath = "file://" + localTestDir + "/cycle" + i; + + // Write + df.write().mode(SaveMode.Overwrite).parquet(localPath); + System.out.println("✅ Cycle " + i + " write completed"); + + // Read back immediately + Dataset<Row> readDf = spark.read().parquet(localPath); + long count = readDf.count(); + System.out.println("✅ Cycle " + i + " read completed: " + count + " rows"); + + assertEquals("Should have 2 rows", 2, count); + } + + System.out.println("\n✅ All cycles completed successfully!"); + } + + private void deleteDirectory(File directory) { + if (directory.exists()) { + File[] files = directory.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } else { + file.delete(); + } + } + } + directory.delete(); + } + } + + // Employee class for testing + public static class Employee implements java.io.Serializable { + private int id; + private String name; + private String department; + private int salary; + + public Employee() {} + + public Employee(int id, String name, String department, int salary) { + this.id = id; + this.name = name; + this.department = department; + this.salary = salary; + } + + public int getId() { return id; } + public void setId(int id) { this.id = id; } + public String getName() { return name; } + public void setName(String name) { this.name = name; } + public String getDepartment() { return department; } + public void setDepartment(String department) { this.department = department; } + public int getSalary() { return salary; } + public void setSalary(int salary) { this.salary = salary; } + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkRawLocalFSTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkRawLocalFSTest.java new file mode 100644 index 000000000..2fd3f4695 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkRawLocalFSTest.java @@ -0,0 +1,132 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Test Spark with Hadoop's RawLocalFileSystem to see if 78-byte error can be reproduced. + * This uses the EXACT same implementation as native local files. + */ +public class SparkRawLocalFSTest extends SparkTestBase { + + private Path testPath; + private FileSystem rawLocalFs; + + @Before + public void setUp() throws IOException { + if (!TESTS_ENABLED) { + return; + } + super.setUpSpark(); + + // Use RawLocalFileSystem explicitly + Configuration conf = new Configuration(); + rawLocalFs = new RawLocalFileSystem(); + rawLocalFs.initialize(java.net.URI.create("file:///"), conf); + + testPath = new Path("/tmp/spark-rawlocal-test-" + System.currentTimeMillis()); + rawLocalFs.delete(testPath, true); + rawLocalFs.mkdirs(testPath); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ CRITICAL TEST: Spark with RawLocalFileSystem ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + System.out.println("Test directory: " + testPath); + } + + @After + public void tearDown() throws IOException { + if (!TESTS_ENABLED) { + return; + } + if (rawLocalFs != null) { + rawLocalFs.delete(testPath, true); + rawLocalFs.close(); + } + super.tearDownSpark(); + } + + @Test + public void testSparkWithRawLocalFileSystem() throws IOException { + skipIfTestsDisabled(); + + System.out.println("\n=== TEST: Write Parquet using RawLocalFileSystem ==="); + + // Create test data (same as SparkSQLTest) + List<Employee> employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000), + new Employee(3, "Charlie", "Engineering", 120000), + new Employee(4, "David", "Sales", 75000)); + + Dataset<Row> df = spark.createDataFrame(employees, Employee.class); + + // CRITICAL: Use file:// prefix to force local filesystem + String outputPath = "file://" + testPath.toString() + "/employees"; + System.out.println("Writing to: " + outputPath); + + // Write using Spark (will use file:// scheme, which uses RawLocalFileSystem) + df.write().mode(SaveMode.Overwrite).parquet(outputPath); + + System.out.println("✅ Write completed successfully!"); + + // Verify by reading back + System.out.println("\n=== TEST: Read Parquet using RawLocalFileSystem ==="); + System.out.println("Reading from: " + outputPath); + Dataset<Row> employeesDf = spark.read().parquet(outputPath); + employeesDf.createOrReplaceTempView("employees"); + + // Run SQL queries + Dataset<Row> engineeringEmployees = spark.sql( + "SELECT name, salary FROM employees WHERE department = 'Engineering'"); + + long count = engineeringEmployees.count(); + assertEquals(2, count); + System.out.println("✅ Read completed successfully! Found " + count + " engineering employees"); + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ ✅ SUCCESS! RawLocalFileSystem works perfectly! ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + } + + // Employee class for Spark DataFrame + public static class Employee implements java.io.Serializable { + private int id; + private String name; + private String department; + private int salary; + + public Employee() {} // Required for Spark + + public Employee(int id, String name, String department, int salary) { + this.id = id; + this.name = name; + this.department = department; + this.salary = salary; + } + + // Getters and Setters (required for Spark) + public int getId() { return id; } + public void setId(int id) { this.id = id; } + public String getName() { return name; } + public void setName(String name) { this.name = name; } + public String getDepartment() { return department; } + public void setDepartment(String department) { this.department = department; } + public int getSalary() { return salary; } + public void setSalary(int salary) { this.salary = salary; } + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkReadDirectParquetTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkReadDirectParquetTest.java new file mode 100644 index 000000000..f93d43ce7 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkReadDirectParquetTest.java @@ -0,0 +1,194 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.*; + +/** + * Test if Spark can read a Parquet file that was written directly + * (not by Spark) to SeaweedFS. + * + * This isolates whether the 78-byte EOF error is in: + * - Spark's WRITE path (if this test passes) + * - Spark's READ path (if this test also fails) + */ +public class SparkReadDirectParquetTest extends SparkTestBase { + + private static final String SCHEMA_STRING = + "message Employee { " + + " required int32 id; " + + " required binary name (UTF8); " + + " required int32 age; " + + "}"; + + private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(SCHEMA_STRING); + + @Test + public void testSparkReadDirectlyWrittenParquet() throws Exception { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ SPARK READS DIRECTLY-WRITTEN PARQUET FILE TEST ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝\n"); + + String testPath = getSeaweedFSPath("/direct-write-test/employees.parquet"); + + // Step 1: Write Parquet file directly (not via Spark) + System.out.println("=== Step 1: Writing Parquet file directly (bypassing Spark) ==="); + writeParquetFileDirect(testPath); + System.out.println("✅ File written successfully: " + testPath); + + // Step 2: Try to read it with Spark + System.out.println("\n=== Step 2: Reading file with Spark ==="); + try { + Dataset<Row> df = spark.read().parquet(testPath); + + System.out.println("Schema:"); + df.printSchema(); + + long count = df.count(); + System.out.println("Row count: " + count); + + System.out.println("\nData:"); + df.show(); + + // Verify data + assertEquals("Should have 3 rows", 3, count); + + System.out.println("\n✅ SUCCESS! Spark can read directly-written Parquet file!"); + System.out.println("✅ This proves the issue is in SPARK'S WRITE PATH, not read path!"); + + } catch (Exception e) { + System.out.println("\n❌ FAILED! Spark cannot read directly-written Parquet file!"); + System.out.println("Error: " + e.getMessage()); + + if (e.getMessage() != null && e.getMessage().contains("bytes left")) { + System.out.println("🎯 This is the 78-byte EOF error!"); + System.out.println("❌ This means the issue is in SPARK'S READ PATH!"); + } + + e.printStackTrace(); + throw e; + } + } + + @Test + public void testSparkWriteThenRead() throws Exception { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + return; + } + + System.out.println("\n╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ SPARK WRITES THEN READS PARQUET FILE TEST (BASELINE) ║"); + System.out.println("╚══════════════════════════════════════════════════════════════╝\n"); + + String testPath = getSeaweedFSPath("/spark-write-test/employees"); + + // Step 1: Write with Spark + System.out.println("=== Step 1: Writing Parquet file with Spark ==="); + spark.sql("CREATE TABLE IF NOT EXISTS test_employees (id INT, name STRING, age INT) USING parquet LOCATION '" + testPath + "'"); + spark.sql("INSERT INTO test_employees VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)"); + System.out.println("✅ File written by Spark"); + + // Step 2: Try to read it with Spark + System.out.println("\n=== Step 2: Reading file with Spark ==="); + try { + Dataset<Row> df = spark.read().parquet(testPath); + + System.out.println("Schema:"); + df.printSchema(); + + long count = df.count(); + System.out.println("Row count: " + count); + + System.out.println("\nData:"); + df.show(); + + assertEquals("Should have 3 rows", 3, count); + + System.out.println("\n✅ SUCCESS! Spark can read its own Parquet file!"); + + } catch (Exception e) { + System.out.println("\n❌ FAILED! Spark cannot read its own Parquet file!"); + System.out.println("Error: " + e.getMessage()); + + if (e.getMessage() != null && e.getMessage().contains("bytes left")) { + System.out.println("🎯 This is the 78-byte EOF error!"); + } + + e.printStackTrace(); + throw e; + } finally { + spark.sql("DROP TABLE IF EXISTS test_employees"); + } + } + + private void writeParquetFileDirect(String seaweedPath) throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + conf.set("fs.seaweed.filer.host", SEAWEEDFS_HOST); + conf.set("fs.seaweed.filer.port", SEAWEEDFS_PORT); + + FileSystem fs = FileSystem.get(java.net.URI.create("seaweedfs://" + SEAWEEDFS_HOST + ":" + SEAWEEDFS_PORT), conf); + Path path = new Path(seaweedPath); + + // Ensure parent directory exists + fs.mkdirs(path.getParent()); + + GroupWriteSupport.setSchema(SCHEMA, conf); + + try (ParquetWriter<Group> writer = org.apache.parquet.hadoop.example.ExampleParquetWriter.builder(path) + .withConf(conf) + .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + + // Write same 3 rows as Spark test + System.out.println(" Writing row 1: id=1, name=Alice, age=30"); + Group group1 = factory.newGroup() + .append("id", 1) + .append("name", "Alice") + .append("age", 30); + writer.write(group1); + + System.out.println(" Writing row 2: id=2, name=Bob, age=25"); + Group group2 = factory.newGroup() + .append("id", 2) + .append("name", "Bob") + .append("age", 25); + writer.write(group2); + + System.out.println(" Writing row 3: id=3, name=Charlie, age=35"); + Group group3 = factory.newGroup() + .append("id", 3) + .append("name", "Charlie") + .append("age", 35); + writer.write(group3); + } + + // Verify file was written + org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(path); + System.out.println(" File size: " + status.getLen() + " bytes"); + + fs.close(); + } +} + diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java new file mode 100644 index 000000000..10ea1cd3a --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkReadWriteTest.java @@ -0,0 +1,239 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Integration tests for Spark read/write operations with SeaweedFS. + */ +public class SparkReadWriteTest extends SparkTestBase { + + @Test + public void testWriteAndReadParquet() { + skipIfTestsDisabled(); + + // Create test data + List<Person> people = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25), + new Person("Charlie", 35)); + + Dataset<Row> df = spark.createDataFrame(people, Person.class); + + // Write to SeaweedFS + String outputPath = getTestPath("people.parquet"); + df.write().mode(SaveMode.Overwrite).parquet(outputPath); + + // Read back from SeaweedFS + Dataset<Row> readDf = spark.read().parquet(outputPath); + + // Verify + assertEquals(3, readDf.count()); + assertEquals(2, readDf.columns().length); + + List<Row> results = readDf.collectAsList(); + assertTrue(results.stream().anyMatch(r -> "Alice".equals(r.getAs("name")) && (Integer) r.getAs("age") == 30)); + assertTrue(results.stream().anyMatch(r -> "Bob".equals(r.getAs("name")) && (Integer) r.getAs("age") == 25)); + assertTrue(results.stream().anyMatch(r -> "Charlie".equals(r.getAs("name")) && (Integer) r.getAs("age") == 35)); + } + + @Test + public void testWriteAndReadCSV() { + skipIfTestsDisabled(); + + // Create test data + List<Person> people = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25)); + + Dataset<Row> df = spark.createDataFrame(people, Person.class); + + // Write to SeaweedFS as CSV + String outputPath = getTestPath("people.csv"); + df.write().mode(SaveMode.Overwrite).option("header", "true").csv(outputPath); + + // Read back from SeaweedFS + Dataset<Row> readDf = spark.read().option("header", "true").option("inferSchema", "true").csv(outputPath); + + // Verify + assertEquals(2, readDf.count()); + assertEquals(2, readDf.columns().length); + } + + @Test + public void testWriteAndReadJSON() { + skipIfTestsDisabled(); + + // Create test data + List<Person> people = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25), + new Person("Charlie", 35)); + + Dataset<Row> df = spark.createDataFrame(people, Person.class); + + // Write to SeaweedFS as JSON + String outputPath = getTestPath("people.json"); + df.write().mode(SaveMode.Overwrite).json(outputPath); + + // Read back from SeaweedFS + Dataset<Row> readDf = spark.read().json(outputPath); + + // Verify + assertEquals(3, readDf.count()); + assertEquals(2, readDf.columns().length); + } + + @Test + public void testWritePartitionedData() { + skipIfTestsDisabled(); + + // Create test data with multiple years + List<PersonWithYear> people = Arrays.asList( + new PersonWithYear("Alice", 30, 2020), + new PersonWithYear("Bob", 25, 2021), + new PersonWithYear("Charlie", 35, 2020), + new PersonWithYear("David", 28, 2021)); + + Dataset<Row> df = spark.createDataFrame(people, PersonWithYear.class); + + // Write partitioned data to SeaweedFS + String outputPath = getTestPath("people_partitioned"); + df.write().mode(SaveMode.Overwrite).partitionBy("year").parquet(outputPath); + + // Read back from SeaweedFS + Dataset<Row> readDf = spark.read().parquet(outputPath); + + // Verify + assertEquals(4, readDf.count()); + + // Verify partition filtering works + Dataset<Row> filtered2020 = readDf.filter("year = 2020"); + assertEquals(2, filtered2020.count()); + + Dataset<Row> filtered2021 = readDf.filter("year = 2021"); + assertEquals(2, filtered2021.count()); + } + + @Test + public void testAppendMode() { + skipIfTestsDisabled(); + + String outputPath = getTestPath("people_append.parquet"); + + // Write first batch + List<Person> batch1 = Arrays.asList( + new Person("Alice", 30), + new Person("Bob", 25)); + Dataset<Row> df1 = spark.createDataFrame(batch1, Person.class); + df1.write().mode(SaveMode.Overwrite).parquet(outputPath); + + // Append second batch + List<Person> batch2 = Arrays.asList( + new Person("Charlie", 35), + new Person("David", 28)); + Dataset<Row> df2 = spark.createDataFrame(batch2, Person.class); + df2.write().mode(SaveMode.Append).parquet(outputPath); + + // Read back and verify + Dataset<Row> readDf = spark.read().parquet(outputPath); + assertEquals(4, readDf.count()); + } + + @Test + public void testLargeDataset() { + skipIfTestsDisabled(); + + // Create a larger dataset + Dataset<Row> largeDf = spark.range(0, 10000) + .selectExpr("id as value", "id * 2 as doubled"); + + String outputPath = getTestPath("large_dataset.parquet"); + largeDf.write().mode(SaveMode.Overwrite).parquet(outputPath); + + // Read back and verify + Dataset<Row> readDf = spark.read().parquet(outputPath); + assertEquals(10000, readDf.count()); + + // Verify some data (sort to ensure deterministic order) + Row firstRow = readDf.orderBy("value").first(); + assertEquals(0L, firstRow.getLong(0)); + assertEquals(0L, firstRow.getLong(1)); + } + + // Test data classes + public static class Person implements java.io.Serializable { + private String name; + private int age; + + public Person() { + } + + public Person(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + } + + public static class PersonWithYear implements java.io.Serializable { + private String name; + private int age; + private int year; + + public PersonWithYear() { + } + + public PersonWithYear(String name, int age, int year) { + this.name = name; + this.age = age; + this.year = year; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public int getYear() { + return year; + } + + public void setYear(int year) { + this.year = year; + } + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java new file mode 100644 index 000000000..231952023 --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkSQLTest.java @@ -0,0 +1,278 @@ +package seaweed.spark; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Integration tests for Spark SQL operations with SeaweedFS. + */ +public class SparkSQLTest extends SparkTestBase { + + @Test + public void testCreateTableAndQuery() { + skipIfTestsDisabled(); + + // Create test data + List<Employee> employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000), + new Employee(3, "Charlie", "Engineering", 120000), + new Employee(4, "David", "Sales", 75000)); + + Dataset<Row> df = spark.createDataFrame(employees, Employee.class); + + // Write to SeaweedFS + String tablePath = getTestPath("employees"); + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + // Create temporary view + Dataset<Row> employeesDf = spark.read().parquet(tablePath); + employeesDf.createOrReplaceTempView("employees"); + + // Run SQL queries + Dataset<Row> engineeringEmployees = spark.sql( + "SELECT name, salary FROM employees WHERE department = 'Engineering'"); + + assertEquals(2, engineeringEmployees.count()); + + Dataset<Row> highPaidEmployees = spark.sql( + "SELECT name, salary FROM employees WHERE salary > 90000"); + + assertEquals(2, highPaidEmployees.count()); + } + + @Test + public void testAggregationQueries() { + skipIfTestsDisabled(); + + // Create sales data + List<Sale> sales = Arrays.asList( + new Sale("2024-01", "Product A", 100), + new Sale("2024-01", "Product B", 150), + new Sale("2024-02", "Product A", 120), + new Sale("2024-02", "Product B", 180), + new Sale("2024-03", "Product A", 110)); + + Dataset<Row> df = spark.createDataFrame(sales, Sale.class); + + // Write to SeaweedFS + String tablePath = getTestPath("sales"); + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + // Create temporary view + Dataset<Row> salesDf = spark.read().parquet(tablePath); + salesDf.createOrReplaceTempView("sales"); + + // Aggregate query + Dataset<Row> monthlySales = spark.sql( + "SELECT month, SUM(amount) as total FROM sales GROUP BY month ORDER BY month"); + + List<Row> results = monthlySales.collectAsList(); + assertEquals(3, results.size()); + assertEquals("2024-01", results.get(0).getString(0)); + assertEquals(250, results.get(0).getLong(1)); + } + + @Test + public void testJoinOperations() { + skipIfTestsDisabled(); + + // Create employee data + List<Employee> employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Sales", 80000)); + + // Create department data + List<Department> departments = Arrays.asList( + new Department("Engineering", "Building Products"), + new Department("Sales", "Selling Products")); + + Dataset<Row> empDf = spark.createDataFrame(employees, Employee.class); + Dataset<Row> deptDf = spark.createDataFrame(departments, Department.class); + + // Write to SeaweedFS + String empPath = getTestPath("employees_join"); + String deptPath = getTestPath("departments_join"); + + empDf.write().mode(SaveMode.Overwrite).parquet(empPath); + deptDf.write().mode(SaveMode.Overwrite).parquet(deptPath); + + // Read back and create views + spark.read().parquet(empPath).createOrReplaceTempView("emp"); + spark.read().parquet(deptPath).createOrReplaceTempView("dept"); + + // Join query + Dataset<Row> joined = spark.sql( + "SELECT e.name, e.salary, d.description " + + "FROM emp e JOIN dept d ON e.department = d.name"); + + assertEquals(2, joined.count()); + + List<Row> results = joined.collectAsList(); + assertTrue(results.stream() + .anyMatch(r -> "Alice".equals(r.getString(0)) && "Building Products".equals(r.getString(2)))); + } + + @Test + public void testWindowFunctions() { + skipIfTestsDisabled(); + + // Create employee data with salaries + List<Employee> employees = Arrays.asList( + new Employee(1, "Alice", "Engineering", 100000), + new Employee(2, "Bob", "Engineering", 120000), + new Employee(3, "Charlie", "Sales", 80000), + new Employee(4, "David", "Sales", 90000)); + + Dataset<Row> df = spark.createDataFrame(employees, Employee.class); + + String tablePath = getTestPath("employees_window"); + df.write().mode(SaveMode.Overwrite).parquet(tablePath); + + Dataset<Row> employeesDf = spark.read().parquet(tablePath); + employeesDf.createOrReplaceTempView("employees_ranked"); + + // Window function query - rank employees by salary within department + Dataset<Row> ranked = spark.sql( + "SELECT name, department, salary, " + + "RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank " + + "FROM employees_ranked"); + + assertEquals(4, ranked.count()); + + // Verify Bob has rank 1 in Engineering (highest salary) + List<Row> results = ranked.collectAsList(); + Row bobRow = results.stream() + .filter(r -> "Bob".equals(r.getString(0))) + .findFirst() + .orElse(null); + + assertNotNull(bobRow); + assertEquals(1, bobRow.getInt(3)); + } + + // Test data classes + public static class Employee implements java.io.Serializable { + private int id; + private String name; + private String department; + private int salary; + + public Employee() { + } + + public Employee(int id, String name, String department, int salary) { + this.id = id; + this.name = name; + this.department = department; + this.salary = salary; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDepartment() { + return department; + } + + public void setDepartment(String department) { + this.department = department; + } + + public int getSalary() { + return salary; + } + + public void setSalary(int salary) { + this.salary = salary; + } + } + + public static class Sale implements java.io.Serializable { + private String month; + private String product; + private int amount; + + public Sale() { + } + + public Sale(String month, String product, int amount) { + this.month = month; + this.product = product; + this.amount = amount; + } + + public String getMonth() { + return month; + } + + public void setMonth(String month) { + this.month = month; + } + + public String getProduct() { + return product; + } + + public void setProduct(String product) { + this.product = product; + } + + public int getAmount() { + return amount; + } + + public void setAmount(int amount) { + this.amount = amount; + } + } + + public static class Department implements java.io.Serializable { + private String name; + private String description; + + public Department() { + } + + public Department(String name, String description) { + this.name = name; + this.description = description; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + } +} diff --git a/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java new file mode 100644 index 000000000..5b17e6f2d --- /dev/null +++ b/test/java/spark/src/test/java/seaweed/spark/SparkTestBase.java @@ -0,0 +1,128 @@ +package seaweed.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +/** + * Base class for Spark integration tests with SeaweedFS. + * + * These tests require a running SeaweedFS cluster. + * Set environment variable SEAWEEDFS_TEST_ENABLED=true to enable these tests. + */ +public abstract class SparkTestBase { + + protected SparkSession spark; + protected static final String TEST_ROOT = "/test-spark"; + protected static final boolean TESTS_ENABLED = "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); + + // SeaweedFS connection settings + protected static final String SEAWEEDFS_HOST = System.getenv().getOrDefault("SEAWEEDFS_FILER_HOST", "localhost"); + protected static final String SEAWEEDFS_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_PORT", "8888"); + protected static final String SEAWEEDFS_GRPC_PORT = System.getenv().getOrDefault("SEAWEEDFS_FILER_GRPC_PORT", + "18888"); + + @Before + public void setUpSpark() throws IOException { + if (!TESTS_ENABLED) { + return; + } + + SparkConf sparkConf = new SparkConf() + .setAppName("SeaweedFS Integration Test") + .setMaster("local[1]") // Single thread to avoid concurrent gRPC issues + .set("spark.driver.host", "localhost") + .set("spark.sql.warehouse.dir", getSeaweedFSPath("/spark-warehouse")) + // SeaweedFS configuration + .set("spark.hadoop.fs.defaultFS", String.format("seaweedfs://%s:%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT)) + .set("spark.hadoop.fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem") + .set("spark.hadoop.fs.seaweed.impl", "seaweed.hdfs.SeaweedFileSystem") + .set("spark.hadoop.fs.seaweed.filer.host", SEAWEEDFS_HOST) + .set("spark.hadoop.fs.seaweed.filer.port", SEAWEEDFS_PORT) + .set("spark.hadoop.fs.seaweed.filer.port.grpc", SEAWEEDFS_GRPC_PORT) + .set("spark.hadoop.fs.AbstractFileSystem.seaweedfs.impl", "seaweed.hdfs.SeaweedAbstractFileSystem") + // Set replication to empty string to use filer default + .set("spark.hadoop.fs.seaweed.replication", "") + // Smaller buffer to reduce load + .set("spark.hadoop.fs.seaweed.buffer.size", "1048576") // 1MB + // Reduce parallelism + .set("spark.default.parallelism", "1") + .set("spark.sql.shuffle.partitions", "1") + // Simpler output committer + .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") + .set("spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") + // Disable speculative execution to reduce load + .set("spark.speculation", "false") + // Increase task retry to handle transient consistency issues + .set("spark.task.maxFailures", "4") + // Wait longer before retrying failed tasks + .set("spark.task.reaper.enabled", "true") + .set("spark.task.reaper.pollingInterval", "1s"); + + spark = SparkSession.builder() + .config(sparkConf) + .getOrCreate(); + + // Clean up test directory + cleanupTestDirectory(); + } + + @After + public void tearDownSpark() { + if (!TESTS_ENABLED || spark == null) { + return; + } + + try { + // Try to cleanup but don't fail if it doesn't work + cleanupTestDirectory(); + } catch (Exception e) { + System.err.println("Cleanup failed: " + e.getMessage()); + } finally { + try { + spark.stop(); + } catch (Exception e) { + System.err.println("Spark stop failed: " + e.getMessage()); + } + spark = null; + } + } + + protected String getSeaweedFSPath(String path) { + return String.format("seaweedfs://%s:%s%s", SEAWEEDFS_HOST, SEAWEEDFS_PORT, path); + } + + protected String getTestPath(String subPath) { + return getSeaweedFSPath(TEST_ROOT + "/" + subPath); + } + + private void cleanupTestDirectory() { + if (spark != null) { + try { + Configuration conf = spark.sparkContext().hadoopConfiguration(); + org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get( + java.net.URI.create(getSeaweedFSPath("/")), conf); + org.apache.hadoop.fs.Path testPath = new org.apache.hadoop.fs.Path(TEST_ROOT); + if (fs.exists(testPath)) { + fs.delete(testPath, true); + } + } catch (Exception e) { + // Suppress cleanup errors - they shouldn't fail tests + // Common in distributed systems with eventual consistency + System.err.println("Warning: cleanup failed (non-critical): " + e.getMessage()); + } + } + } + + protected void skipIfTestsDisabled() { + if (!TESTS_ENABLED) { + System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); + org.junit.Assume.assumeTrue("SEAWEEDFS_TEST_ENABLED not set", false); + } + } +} diff --git a/test/java/spark/src/test/resources/log4j.properties b/test/java/spark/src/test/resources/log4j.properties new file mode 100644 index 000000000..0a603e1b0 --- /dev/null +++ b/test/java/spark/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Set root logger level +log4j.rootLogger=WARN, console + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Set log levels for specific packages +log4j.logger.org.apache.spark=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.parquet=WARN +log4j.logger.seaweed=INFO + +# Suppress unnecessary warnings +log4j.logger.org.apache.spark.util.Utils=ERROR +log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR + diff --git a/test/kafka/go.mod b/test/kafka/go.mod index b0f66885f..f0a99e7fe 100644 --- a/test/kafka/go.mod +++ b/test/kafka/go.mod @@ -10,7 +10,7 @@ require ( github.com/seaweedfs/seaweedfs v0.0.0-00010101000000-000000000000 github.com/segmentio/kafka-go v0.4.49 github.com/stretchr/testify v1.11.1 - google.golang.org/grpc v1.75.1 + google.golang.org/grpc v1.77.0 ) replace github.com/seaweedfs/seaweedfs => ../../ @@ -18,7 +18,7 @@ replace github.com/seaweedfs/seaweedfs => ../../ require ( cloud.google.com/go/auth v0.16.5 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect - cloud.google.com/go/compute/metadata v0.8.0 // indirect + cloud.google.com/go/compute/metadata v0.9.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect @@ -95,7 +95,7 @@ require ( github.com/geoffgarside/ber v1.2.0 // indirect github.com/go-chi/chi/v5 v5.2.2 // indirect github.com/go-darwin/apfs v0.0.0-20211011131704-f84b94dbf348 // indirect - github.com/go-jose/go-jose/v4 v4.1.1 // indirect + github.com/go-jose/go-jose/v4 v4.1.3 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect @@ -200,7 +200,7 @@ require ( github.com/spf13/cast v1.10.0 // indirect github.com/spf13/pflag v1.0.10 // indirect github.com/spf13/viper v1.21.0 // indirect - github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect + github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 // indirect github.com/t3rm1n4l/go-mega v0.0.0-20250926104142-ccb8d3498e6c // indirect @@ -222,27 +222,27 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.etcd.io/bbolt v1.4.2 // indirect go.mongodb.org/mongo-driver v1.17.6 // indirect - go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect - go.opentelemetry.io/otel v1.37.0 // indirect - go.opentelemetry.io/otel/metric v1.37.0 // indirect - go.opentelemetry.io/otel/trace v1.37.0 // indirect + go.opentelemetry.io/otel v1.38.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/exp v0.0.0-20250811191247-51f88131bc50 // indirect golang.org/x/image v0.33.0 // indirect golang.org/x/net v0.47.0 // indirect - golang.org/x/oauth2 v0.30.0 // indirect + golang.org/x/oauth2 v0.32.0 // indirect golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/term v0.37.0 // indirect golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.12.0 // indirect google.golang.org/api v0.247.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect google.golang.org/grpc/security/advancedtls v1.0.0 // indirect - google.golang.org/protobuf v1.36.9 // indirect + google.golang.org/protobuf v1.36.10 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/validator.v2 v2.0.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/test/kafka/go.sum b/test/kafka/go.sum index 3295407b4..09553d9d4 100644 --- a/test/kafka/go.sum +++ b/test/kafka/go.sum @@ -23,8 +23,8 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= -cloud.google.com/go/compute/metadata v0.8.0 h1:HxMRIbao8w17ZX6wBnjhcDkW6lTFpgcaobyVfZWqRLA= -cloud.google.com/go/compute/metadata v0.8.0/go.mod h1:sYOGTp851OV9bOFJ9CH7elVvyzopvWQFNNghtDQ/Biw= +cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= +cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -248,8 +248,8 @@ github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3Bop github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-jose/go-jose/v4 v4.1.1 h1:JYhSgy4mXXzAdF3nUx3ygx347LRXJRrpgyU3adRmkAI= -github.com/go-jose/go-jose/v4 v4.1.1/go.mod h1:BdsZGqgdO3b6tTc6LSE56wcDbMMLuPsw5d4ZD5f94kA= +github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= +github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -597,8 +597,8 @@ github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= -github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE= -github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g= +github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo= +github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -692,20 +692,20 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 h1:Hf9xI/XLML9ElpiHVDNwvqI0hIFlzV8dgIr35kV1kRU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1:NfchwuyNoMcZ5MLHwPrODwUF1HWCXWrL31s8gSAdIKY= -go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= -go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= -go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= -go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= -go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= -go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= -go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= -go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= -go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= -go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= @@ -823,8 +823,8 @@ golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= -golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY= +golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1042,10 +1042,10 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 h1:Nt6z9UHqSlIdIGJdz6KhTIs2VRx/iOsA5iE8bmQNcxs= google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79/go.mod h1:kTmlBHMPqR5uCZPBvwa2B18mvubkjyY3CRLI0c6fj0s= -google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c h1:AtEkQdl5b6zsybXcbz00j1LwNodDuH6hVifIaNqk7NQ= -google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c/go.mod h1:ea2MjsO70ssTfCjiwHgI0ZFqcw45Ksuk2ckf9G468GA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c h1:qXWI/sQtv5UKboZ/zUk7h+mrf/lXORyI+n9DKDAusdg= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= +google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 h1:mepRgnBZa07I4TRuomDE4sTIYieg/osKmzIf4USdWS4= +google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -1058,10 +1058,10 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= -google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= -google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20 h1:MLBCGN1O7GzIx+cBiwfYPwtmZ41U3Mn/cotLJciaArI= -google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20/go.mod h1:Nr5H8+MlGWr5+xX/STzdoEqJrO+YteqFbMyCsrb6mH0= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= +google.golang.org/grpc/examples v0.0.0-20250407062114-b368379ef8f6 h1:ExN12ndbJ608cboPYflpTny6mXSzPrDLh0iTaVrRrds= +google.golang.org/grpc/examples v0.0.0-20250407062114-b368379ef8f6/go.mod h1:6ytKWczdvnpnO+m+JiG9NjEDzR1FJfsnmJdG7B8QVZ8= google.golang.org/grpc/security/advancedtls v1.0.0 h1:/KQ7VP/1bs53/aopk9QhuPyFAp9Dm9Ejix3lzYkCrDA= google.golang.org/grpc/security/advancedtls v1.0.0/go.mod h1:o+s4go+e1PJ2AjuQMY5hU82W7lDlefjJA6FqEHRVHWk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -1074,8 +1074,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= -google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= |
