1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
|
#!/usr/bin/env python3
"""
Cross-filesystem compatibility tests for PyArrow Parquet files.
This test verifies that Parquet files written using one filesystem implementation
(s3fs or PyArrow native S3) can be correctly read using the other implementation.
Test Matrix:
- Write with s3fs → Read with PyArrow native S3
- Write with PyArrow native S3 → Read with s3fs
Requirements:
- pyarrow>=22.0.0
- s3fs>=2024.12.0
- boto3>=1.40.0
Environment Variables:
S3_ENDPOINT_URL: S3 endpoint (default: http://localhost:8333)
S3_ACCESS_KEY: S3 access key (default: some_access_key1)
S3_SECRET_KEY: S3 secret key (default: some_secret_key1)
BUCKET_NAME: S3 bucket name (default: test-parquet-bucket)
TEST_QUICK: Run only small/quick tests (default: 0, set to 1 for quick mode)
Usage:
# Run with default environment variables
python3 test_cross_filesystem_compatibility.py
# Run with custom environment variables
S3_ENDPOINT_URL=http://localhost:8333 \
S3_ACCESS_KEY=mykey \
S3_SECRET_KEY=mysecret \
BUCKET_NAME=mybucket \
python3 test_cross_filesystem_compatibility.py
"""
import os
import secrets
import sys
import logging
from typing import Optional, Tuple
import pyarrow as pa
import pyarrow.dataset as pads
import pyarrow.fs as pafs
import pyarrow.parquet as pq
import s3fs
try:
import boto3
from botocore.exceptions import ClientError
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
from parquet_test_utils import create_sample_table
logging.basicConfig(level=logging.INFO, format="%(message)s")
# Configuration from environment variables with defaults
S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL", "http://localhost:8333")
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "some_access_key1")
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "some_secret_key1")
BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket")
TEST_QUICK = os.getenv("TEST_QUICK", "0") == "1"
# Create randomized test directory
TEST_RUN_ID = secrets.token_hex(8)
TEST_DIR = f"parquet-cross-fs-tests/{TEST_RUN_ID}"
# Test file sizes
TEST_SIZES = {
"small": 5,
"large": 200_000, # This will create multiple row groups
}
# Filter to only small tests if quick mode is enabled
if TEST_QUICK:
TEST_SIZES = {"small": TEST_SIZES["small"]}
logging.info("Quick test mode enabled - running only small tests")
def init_s3fs() -> Optional[s3fs.S3FileSystem]:
"""Initialize s3fs filesystem."""
try:
logging.info("Initializing s3fs...")
fs = s3fs.S3FileSystem(
client_kwargs={"endpoint_url": S3_ENDPOINT_URL},
key=S3_ACCESS_KEY,
secret=S3_SECRET_KEY,
use_listings_cache=False,
)
logging.info("✓ s3fs initialized successfully")
return fs
except Exception:
logging.exception("✗ Failed to initialize s3fs")
return None
def init_pyarrow_s3() -> Tuple[Optional[pafs.S3FileSystem], str, str]:
"""Initialize PyArrow's native S3 filesystem.
Returns:
tuple: (S3FileSystem instance, scheme, endpoint)
"""
try:
logging.info("Initializing PyArrow S3FileSystem...")
# Determine scheme from endpoint
if S3_ENDPOINT_URL.startswith("http://"):
scheme = "http"
endpoint = S3_ENDPOINT_URL[7:] # Remove http://
elif S3_ENDPOINT_URL.startswith("https://"):
scheme = "https"
endpoint = S3_ENDPOINT_URL[8:] # Remove https://
else:
# Default to http for localhost
scheme = "http"
endpoint = S3_ENDPOINT_URL
# Enable bucket creation and deletion for testing
s3 = pafs.S3FileSystem(
access_key=S3_ACCESS_KEY,
secret_key=S3_SECRET_KEY,
endpoint_override=endpoint,
scheme=scheme,
allow_bucket_creation=True,
allow_bucket_deletion=True,
)
logging.info("✓ PyArrow S3FileSystem initialized successfully")
return s3, scheme, endpoint
except Exception:
logging.exception("✗ Failed to initialize PyArrow S3FileSystem")
return None, "", ""
def ensure_bucket_exists(s3fs_fs: s3fs.S3FileSystem, pyarrow_s3: pafs.S3FileSystem) -> bool:
"""Ensure the test bucket exists using s3fs."""
try:
if not s3fs_fs.exists(BUCKET_NAME):
logging.info(f"Creating bucket: {BUCKET_NAME}")
try:
s3fs_fs.mkdir(BUCKET_NAME)
logging.info(f"✓ Bucket created: {BUCKET_NAME}")
except FileExistsError:
# Bucket was created between the check and mkdir call
logging.info(f"✓ Bucket exists: {BUCKET_NAME}")
else:
logging.info(f"✓ Bucket exists: {BUCKET_NAME}")
return True
except Exception:
logging.exception("✗ Failed to create/check bucket")
return False
def write_with_s3fs(table: pa.Table, path: str, s3fs_fs: s3fs.S3FileSystem) -> bool:
"""Write Parquet file using s3fs filesystem."""
try:
pads.write_dataset(table, path, format="parquet", filesystem=s3fs_fs)
return True
except Exception:
logging.exception("✗ Failed to write with s3fs")
return False
def write_with_pyarrow_s3(table: pa.Table, path: str, pyarrow_s3: pafs.S3FileSystem) -> bool:
"""Write Parquet file using PyArrow native S3 filesystem."""
try:
pads.write_dataset(table, path, format="parquet", filesystem=pyarrow_s3)
return True
except Exception:
logging.exception("✗ Failed to write with PyArrow S3")
return False
def read_with_s3fs(path: str, s3fs_fs: s3fs.S3FileSystem) -> Tuple[bool, Optional[pa.Table], str]:
"""Read Parquet file using s3fs filesystem with multiple methods."""
errors = []
# Try pq.read_table
try:
table = pq.read_table(path, filesystem=s3fs_fs)
except Exception as e: # noqa: BLE001 - Intentionally broad for compatibility testing
errors.append(f"pq.read_table: {type(e).__name__}: {e}")
else:
return True, table, "pq.read_table"
# Try pq.ParquetDataset
try:
dataset = pq.ParquetDataset(path, filesystem=s3fs_fs)
table = dataset.read()
except Exception as e: # noqa: BLE001 - Intentionally broad for compatibility testing
errors.append(f"pq.ParquetDataset: {type(e).__name__}: {e}")
else:
return True, table, "pq.ParquetDataset"
# Try pads.dataset
try:
dataset = pads.dataset(path, format="parquet", filesystem=s3fs_fs)
table = dataset.to_table()
except Exception as e: # noqa: BLE001 - Intentionally broad for compatibility testing
errors.append(f"pads.dataset: {type(e).__name__}: {e}")
else:
return True, table, "pads.dataset"
return False, None, " | ".join(errors)
def read_with_pyarrow_s3(path: str, pyarrow_s3: pafs.S3FileSystem) -> Tuple[bool, Optional[pa.Table], str]:
"""Read Parquet file using PyArrow native S3 filesystem with multiple methods."""
errors = []
# Try pq.read_table
try:
table = pq.read_table(path, filesystem=pyarrow_s3)
except Exception as e: # noqa: BLE001 - Intentionally broad for compatibility testing
errors.append(f"pq.read_table: {type(e).__name__}: {e}")
else:
return True, table, "pq.read_table"
# Try pq.ParquetDataset
try:
dataset = pq.ParquetDataset(path, filesystem=pyarrow_s3)
table = dataset.read()
except Exception as e: # noqa: BLE001 - Intentionally broad for compatibility testing
errors.append(f"pq.ParquetDataset: {type(e).__name__}: {e}")
else:
return True, table, "pq.ParquetDataset"
# Try pads.dataset
try:
dataset = pads.dataset(path, filesystem=pyarrow_s3)
table = dataset.to_table()
except Exception as e: # noqa: BLE001 - Intentionally broad for compatibility testing
errors.append(f"pads.dataset: {type(e).__name__}: {e}")
else:
return True, table, "pads.dataset"
return False, None, " | ".join(errors)
def verify_table_integrity(original: pa.Table, read: pa.Table) -> Tuple[bool, str]:
"""Verify that read table matches the original table."""
# Check row count
if read.num_rows != original.num_rows:
return False, f"Row count mismatch: expected {original.num_rows}, got {read.num_rows}"
# Check schema
if not read.schema.equals(original.schema):
return False, f"Schema mismatch: expected {original.schema}, got {read.schema}"
# Sort both tables by 'id' column before comparison to handle potential row order differences
original_sorted = original.sort_by([('id', 'ascending')])
read_sorted = read.sort_by([('id', 'ascending')])
# Check data equality
if not read_sorted.equals(original_sorted):
# Provide detailed error information
error_details = []
for col_name in original.column_names:
col_original = original_sorted.column(col_name)
col_read = read_sorted.column(col_name)
if not col_original.equals(col_read):
error_details.append(f"column '{col_name}' differs")
return False, f"Data mismatch: {', '.join(error_details)}"
return True, "Data verified successfully"
def test_write_s3fs_read_pyarrow(
test_name: str,
num_rows: int,
s3fs_fs: s3fs.S3FileSystem,
pyarrow_s3: pafs.S3FileSystem
) -> Tuple[bool, str]:
"""Test: Write with s3fs, read with PyArrow native S3."""
try:
table = create_sample_table(num_rows)
path = f"{BUCKET_NAME}/{TEST_DIR}/{test_name}/data.parquet"
# Write with s3fs
logging.info(f" Writing {num_rows:,} rows with s3fs to {path}...")
if not write_with_s3fs(table, path, s3fs_fs):
return False, "Write with s3fs failed"
logging.info(" ✓ Write completed")
# Read with PyArrow native S3
logging.info(" Reading with PyArrow native S3...")
success, read_table, method = read_with_pyarrow_s3(path, pyarrow_s3)
if not success:
return False, f"Read with PyArrow S3 failed: {method}"
logging.info(f" ✓ Read {read_table.num_rows:,} rows using {method}")
# Verify data integrity
verify_success, verify_msg = verify_table_integrity(table, read_table)
if not verify_success:
return False, f"Verification failed: {verify_msg}"
logging.info(f" ✓ {verify_msg}")
return True, f"s3fs→PyArrow: {method}"
except Exception as e: # noqa: BLE001 - Top-level exception handler for test orchestration
logging.exception(" ✗ Test failed")
return False, f"{type(e).__name__}: {e}"
def test_write_pyarrow_read_s3fs(
test_name: str,
num_rows: int,
s3fs_fs: s3fs.S3FileSystem,
pyarrow_s3: pafs.S3FileSystem
) -> Tuple[bool, str]:
"""Test: Write with PyArrow native S3, read with s3fs."""
try:
table = create_sample_table(num_rows)
path = f"{BUCKET_NAME}/{TEST_DIR}/{test_name}/data.parquet"
# Write with PyArrow native S3
logging.info(f" Writing {num_rows:,} rows with PyArrow native S3 to {path}...")
if not write_with_pyarrow_s3(table, path, pyarrow_s3):
return False, "Write with PyArrow S3 failed"
logging.info(" ✓ Write completed")
# Read with s3fs
logging.info(" Reading with s3fs...")
success, read_table, method = read_with_s3fs(path, s3fs_fs)
if not success:
return False, f"Read with s3fs failed: {method}"
logging.info(f" ✓ Read {read_table.num_rows:,} rows using {method}")
# Verify data integrity
verify_success, verify_msg = verify_table_integrity(table, read_table)
if not verify_success:
return False, f"Verification failed: {verify_msg}"
logging.info(f" ✓ {verify_msg}")
return True, f"PyArrow→s3fs: {method}"
except Exception as e: # noqa: BLE001 - Top-level exception handler for test orchestration
logging.exception(" ✗ Test failed")
return False, f"{type(e).__name__}: {e}"
def cleanup_test_files(s3fs_fs: s3fs.S3FileSystem) -> None:
"""Clean up test files from S3."""
try:
test_path = f"{BUCKET_NAME}/{TEST_DIR}"
if s3fs_fs.exists(test_path):
logging.info(f"Cleaning up test directory: {test_path}")
s3fs_fs.rm(test_path, recursive=True)
logging.info("✓ Test directory cleaned up")
except Exception:
logging.exception("Failed to cleanup test directory")
def main():
"""Run cross-filesystem compatibility tests."""
print("=" * 80)
print("Cross-Filesystem Compatibility Tests for PyArrow Parquet")
print("Testing: s3fs ↔ PyArrow Native S3 Filesystem")
if TEST_QUICK:
print("*** QUICK TEST MODE - Small files only ***")
print("=" * 80 + "\n")
print("Configuration:")
print(f" S3 Endpoint: {S3_ENDPOINT_URL}")
print(f" Access Key: {S3_ACCESS_KEY}")
print(f" Bucket: {BUCKET_NAME}")
print(f" Test Directory: {TEST_DIR}")
print(f" Quick Mode: {'Yes (small files only)' if TEST_QUICK else 'No (all file sizes)'}")
print(f" PyArrow Version: {pa.__version__}")
print()
# Initialize both filesystems
s3fs_fs = init_s3fs()
if s3fs_fs is None:
print("Cannot proceed without s3fs connection")
return 1
pyarrow_s3, _scheme, _endpoint = init_pyarrow_s3()
if pyarrow_s3 is None:
print("Cannot proceed without PyArrow S3 connection")
return 1
print()
# Ensure bucket exists
if not ensure_bucket_exists(s3fs_fs, pyarrow_s3):
print("Cannot proceed without bucket")
return 1
print()
results = []
# Test all file sizes
for size_name, num_rows in TEST_SIZES.items():
print(f"\n{'='*80}")
print(f"Testing with {size_name} files ({num_rows:,} rows)")
print(f"{'='*80}\n")
# Test 1: Write with s3fs, read with PyArrow native S3
test_name = f"{size_name}_s3fs_to_pyarrow"
print(f"Test: Write with s3fs → Read with PyArrow native S3")
success, message = test_write_s3fs_read_pyarrow(
test_name, num_rows, s3fs_fs, pyarrow_s3
)
results.append((test_name, success, message))
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status}: {message}\n")
# Test 2: Write with PyArrow native S3, read with s3fs
test_name = f"{size_name}_pyarrow_to_s3fs"
print(f"Test: Write with PyArrow native S3 → Read with s3fs")
success, message = test_write_pyarrow_read_s3fs(
test_name, num_rows, s3fs_fs, pyarrow_s3
)
results.append((test_name, success, message))
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status}: {message}\n")
# Summary
print("\n" + "=" * 80)
print("SUMMARY")
print("=" * 80)
passed = sum(1 for _, success, _ in results if success)
total = len(results)
print(f"\nTotal: {passed}/{total} passed\n")
for test_name, success, message in results:
status = "✓" if success else "✗"
print(f" {status} {test_name}: {message}")
print("\n" + "=" * 80)
if passed == total:
print("✓ ALL CROSS-FILESYSTEM TESTS PASSED!")
print()
print("Conclusion: Files written with s3fs and PyArrow native S3 are")
print("fully compatible and can be read by either filesystem implementation.")
else:
print(f"✗ {total - passed} test(s) failed")
print("=" * 80 + "\n")
# Cleanup
cleanup_test_files(s3fs_fs)
return 0 if passed == total else 1
if __name__ == "__main__":
sys.exit(main())
|