aboutsummaryrefslogtreecommitdiff
path: root/test/java/spark/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'test/java/spark/src/main')
-rw-r--r--test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java138
1 files changed, 138 insertions, 0 deletions
diff --git a/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java
new file mode 100644
index 000000000..75b2d710b
--- /dev/null
+++ b/test/java/spark/src/main/java/seaweed/spark/SparkSeaweedFSExample.java
@@ -0,0 +1,138 @@
+package seaweed.spark;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Example Spark application demonstrating SeaweedFS integration.
+ *
+ * This can be submitted to a Spark cluster using spark-submit.
+ *
+ * Example usage:
+ * spark-submit \
+ * --class seaweed.spark.SparkSeaweedFSExample \
+ * --master local[2] \
+ * --conf spark.hadoop.fs.seaweedfs.impl=seaweed.hdfs.SeaweedFileSystem \
+ * --conf spark.hadoop.fs.seaweed.filer.host=localhost \
+ * --conf spark.hadoop.fs.seaweed.filer.port=8888 \
+ * --conf spark.hadoop.fs.seaweed.filer.port.grpc=18888 \
+ * target/seaweedfs-spark-integration-tests-1.0-SNAPSHOT.jar \
+ * seaweedfs://localhost:8888/output
+ */
+public class SparkSeaweedFSExample {
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.err.println("Usage: SparkSeaweedFSExample <output-path>");
+ System.err.println("Example: seaweedfs://localhost:8888/spark-output");
+ System.exit(1);
+ }
+
+ String outputPath = args[0];
+
+ // Create Spark session
+ SparkSession spark = SparkSession.builder()
+ .appName("SeaweedFS Spark Example")
+ .getOrCreate();
+
+ try {
+ System.out.println("=== SeaweedFS Spark Integration Example ===\n");
+
+ // Example 1: Generate data and write to SeaweedFS
+ System.out.println("1. Generating sample data...");
+ Dataset<Row> data = spark.range(0, 1000)
+ .selectExpr(
+ "id",
+ "id * 2 as doubled",
+ "CAST(rand() * 100 AS INT) as random_value");
+
+ System.out.println(" Generated " + data.count() + " rows");
+ data.show(5);
+
+ // Write as Parquet
+ String parquetPath = outputPath + "/data.parquet";
+ System.out.println("\n2. Writing data to SeaweedFS as Parquet...");
+ System.out.println(" Path: " + parquetPath);
+
+ data.write()
+ .mode(SaveMode.Overwrite)
+ .parquet(parquetPath);
+
+ System.out.println(" ✓ Write completed");
+
+ // Read back and verify
+ System.out.println("\n3. Reading data back from SeaweedFS...");
+ Dataset<Row> readData = spark.read().parquet(parquetPath);
+ System.out.println(" Read " + readData.count() + " rows");
+
+ // Perform aggregation
+ System.out.println("\n4. Performing aggregation...");
+ Dataset<Row> stats = readData.selectExpr(
+ "COUNT(*) as count",
+ "AVG(random_value) as avg_random",
+ "MAX(doubled) as max_doubled");
+
+ stats.show();
+
+ // Write aggregation results
+ String statsPath = outputPath + "/stats.parquet";
+ System.out.println("5. Writing stats to: " + statsPath);
+ stats.write()
+ .mode(SaveMode.Overwrite)
+ .parquet(statsPath);
+
+ // Create a partitioned dataset
+ System.out.println("\n6. Creating partitioned dataset...");
+ Dataset<Row> partitionedData = data.selectExpr(
+ "*",
+ "CAST(id % 10 AS INT) as partition_key");
+
+ String partitionedPath = outputPath + "/partitioned.parquet";
+ System.out.println(" Path: " + partitionedPath);
+
+ partitionedData.write()
+ .mode(SaveMode.Overwrite)
+ .partitionBy("partition_key")
+ .parquet(partitionedPath);
+
+ System.out.println(" ✓ Partitioned write completed");
+
+ // Read specific partition
+ System.out.println("\n7. Reading specific partition (partition_key=0)...");
+ Dataset<Row> partition0 = spark.read()
+ .parquet(partitionedPath)
+ .filter("partition_key = 0");
+
+ System.out.println(" Partition 0 contains " + partition0.count() + " rows");
+ partition0.show(5);
+
+ // SQL example
+ System.out.println("\n8. Using Spark SQL...");
+ readData.createOrReplaceTempView("seaweedfs_data");
+
+ Dataset<Row> sqlResult = spark.sql(
+ "SELECT " +
+ " CAST(id / 100 AS INT) as bucket, " +
+ " COUNT(*) as count, " +
+ " AVG(random_value) as avg_random " +
+ "FROM seaweedfs_data " +
+ "GROUP BY CAST(id / 100 AS INT) " +
+ "ORDER BY bucket");
+
+ System.out.println(" Bucketed statistics:");
+ sqlResult.show();
+
+ System.out.println("\n=== Example completed successfully! ===");
+ System.out.println("Output location: " + outputPath);
+
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ } finally {
+ spark.stop();
+ }
+ }
+}