aboutsummaryrefslogtreecommitdiff
path: root/test/s3/parquet/example_pyarrow_native.py
blob: 785ce0b4546c3ffc81f32628637cee837000636d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
# /// script
# dependencies = [
#     "pyarrow>=22",
#     "boto3>=1.28.0",
# ]
# ///

"""
Simple example of using PyArrow's native S3 filesystem with SeaweedFS.

This is a minimal example demonstrating how to write and read Parquet files
using PyArrow's built-in S3FileSystem without any additional dependencies
like s3fs.

Usage:
    # Set environment variables
    export S3_ENDPOINT_URL=localhost:8333
    export S3_ACCESS_KEY=some_access_key1
    export S3_SECRET_KEY=some_secret_key1
    export BUCKET_NAME=test-parquet-bucket

    # Run the script
    python3 example_pyarrow_native.py
    
    # Or run with uv (if available)
    uv run example_pyarrow_native.py
"""

import os
import secrets

import pyarrow as pa
import pyarrow.dataset as pads
import pyarrow.fs as pafs
import pyarrow.parquet as pq

from parquet_test_utils import create_sample_table

# Configuration
BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "localhost:8333")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "some_access_key1")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "some_secret_key1")

# Determine scheme from endpoint
if S3_ENDPOINT_URL.startswith("http://"):
    scheme = "http"
    endpoint = S3_ENDPOINT_URL[7:]
elif S3_ENDPOINT_URL.startswith("https://"):
    scheme = "https"
    endpoint = S3_ENDPOINT_URL[8:]
else:
    scheme = "http"  # Default to http for localhost
    endpoint = S3_ENDPOINT_URL

print(f"Connecting to S3 endpoint: {scheme}://{endpoint}")

# Initialize PyArrow's NATIVE S3 filesystem
s3 = pafs.S3FileSystem(
    access_key=S3_ACCESS_KEY,
    secret_key=S3_SECRET_KEY,
    endpoint_override=endpoint,
    scheme=scheme,
    allow_bucket_creation=True,
    allow_bucket_deletion=True,
)

print("✓ Connected to S3 endpoint")


# Create bucket if needed (using boto3)
try:
    import boto3
    from botocore.exceptions import ClientError
    
    s3_client = boto3.client(
        's3',
        endpoint_url=f"{scheme}://{endpoint}",
        aws_access_key_id=S3_ACCESS_KEY,
        aws_secret_access_key=S3_SECRET_KEY,
        region_name='us-east-1',
    )
    
    try:
        s3_client.head_bucket(Bucket=BUCKET_NAME)
        print(f"✓ Bucket exists: {BUCKET_NAME}")
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Creating bucket: {BUCKET_NAME}")
            s3_client.create_bucket(Bucket=BUCKET_NAME)
            print(f"✓ Bucket created: {BUCKET_NAME}")
        else:
            raise
except ImportError:
    print("Warning: boto3 not available, assuming bucket exists")

# Generate a unique filename
filename = f"{BUCKET_NAME}/dataset-{secrets.token_hex(8)}/test.parquet"

print(f"\nWriting Parquet dataset to: {filename}")

# Write dataset
table = create_sample_table(200_000)
pads.write_dataset(
    table,
    filename,
    filesystem=s3,
    format="parquet",
)

print(f"✓ Wrote {table.num_rows:,} rows")

# Read with pq.read_table
print("\nReading with pq.read_table...")
table_read = pq.read_table(filename, filesystem=s3)
print(f"✓ Read {table_read.num_rows:,} rows")

# Read with pq.ParquetDataset
print("\nReading with pq.ParquetDataset...")
dataset = pq.ParquetDataset(filename, filesystem=s3)
table_dataset = dataset.read()
print(f"✓ Read {table_dataset.num_rows:,} rows")

# Read with pads.dataset
print("\nReading with pads.dataset...")
dataset_pads = pads.dataset(filename, filesystem=s3)
table_pads = dataset_pads.to_table()
print(f"✓ Read {table_pads.num_rows:,} rows")

print("\n✅ All operations completed successfully!")
print(f"\nFile written to: {filename}")
print("You can verify the file using the SeaweedFS S3 API or weed shell")