diff options
Diffstat (limited to 'test/s3')
| -rw-r--r-- | test/s3/parquet/.gitignore | 40 | ||||
| -rw-r--r-- | test/s3/parquet/FINAL_ROOT_CAUSE_ANALYSIS.md | 58 | ||||
| -rw-r--r-- | test/s3/parquet/MINIO_DIRECTORY_HANDLING.md | 70 | ||||
| -rw-r--r-- | test/s3/parquet/Makefile | 365 | ||||
| -rw-r--r-- | test/s3/parquet/README.md | 206 | ||||
| -rw-r--r-- | test/s3/parquet/TEST_COVERAGE.md | 46 | ||||
| -rw-r--r-- | test/s3/parquet/requirements.txt | 7 | ||||
| -rwxr-xr-x | test/s3/parquet/s3_parquet_test.py | 421 | ||||
| -rwxr-xr-x | test/s3/parquet/test_implicit_directory_fix.py | 307 | ||||
| -rw-r--r-- | test/s3/sse/s3_range_headers_test.go | 104 | ||||
| -rw-r--r-- | test/s3/sse/s3_sse_range_server_test.go | 445 |
11 files changed, 2069 insertions, 0 deletions
diff --git a/test/s3/parquet/.gitignore b/test/s3/parquet/.gitignore new file mode 100644 index 000000000..75800e63c --- /dev/null +++ b/test/s3/parquet/.gitignore @@ -0,0 +1,40 @@ +# Python virtual environment +venv/ +.venv/ +env/ +ENV/ + +# Python cache +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python + +# Test artifacts +*.log +test_run.log +weed-test.log + +# SeaweedFS data directories +filerldb2/ +idx/ +dat/ +*.idx +*.dat + +# Temporary test files +.pytest_cache/ +.coverage +htmlcov/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/test/s3/parquet/FINAL_ROOT_CAUSE_ANALYSIS.md b/test/s3/parquet/FINAL_ROOT_CAUSE_ANALYSIS.md new file mode 100644 index 000000000..3dff9cb03 --- /dev/null +++ b/test/s3/parquet/FINAL_ROOT_CAUSE_ANALYSIS.md @@ -0,0 +1,58 @@ +# Final Root Cause Analysis + +## Overview + +This document provides a deep technical analysis of the s3fs compatibility issue with PyArrow Parquet datasets on SeaweedFS, and the solution implemented to resolve it. + +## Root Cause + +When PyArrow writes datasets using `write_dataset()`, it creates implicit directory structures by writing files without explicit directory markers. However, some S3 workflows may create 0-byte directory markers. + +### The Problem + +1. **PyArrow writes dataset files** without creating explicit directory objects +2. **s3fs calls HEAD** on the directory path to check if it exists +3. **If HEAD returns 200** with `Content-Length: 0`, s3fs interprets it as a file (not a directory) +4. **PyArrow fails** when trying to read, reporting "Parquet file size is 0 bytes" + +### AWS S3 Behavior + +AWS S3 returns **404 Not Found** for implicit directories (directories that only exist because they have children but no explicit marker object). This allows s3fs to fall back to LIST operations to detect the directory. + +## The Solution + +### Implementation + +Modified the S3 API HEAD handler in `weed/s3api/s3api_object_handlers.go` to: + +1. **Check if object ends with `/`**: Explicit directory markers return 200 as before +2. **Check if object has children**: If a 0-byte object has children in the filer, treat it as an implicit directory +3. **Return 404 for implicit directories**: This matches AWS S3 behavior and triggers s3fs's LIST fallback + +### Code Changes + +The fix is implemented in the `HeadObjectHandler` function with logic to: +- Detect implicit directories by checking for child entries +- Return 404 (NoSuchKey) for implicit directories +- Preserve existing behavior for explicit directory markers and regular files + +## Performance Considerations + +### Optimization: Child Check Cache +- Child existence checks are performed via filer LIST operations +- Results could be cached for frequently accessed paths +- Trade-off between consistency and performance + +### Impact +- Minimal performance impact for normal file operations +- Slight overhead for HEAD requests on implicit directories (one additional LIST call) +- Overall improvement in PyArrow compatibility outweighs minor performance cost + +## TODO + +- [ ] Add detailed benchmarking results comparing before/after fix +- [ ] Document edge cases discovered during implementation +- [ ] Add architectural diagrams showing the request flow +- [ ] Document alternative solutions considered and why they were rejected +- [ ] Add performance profiling data for child existence checks + diff --git a/test/s3/parquet/MINIO_DIRECTORY_HANDLING.md b/test/s3/parquet/MINIO_DIRECTORY_HANDLING.md new file mode 100644 index 000000000..04d80cfcb --- /dev/null +++ b/test/s3/parquet/MINIO_DIRECTORY_HANDLING.md @@ -0,0 +1,70 @@ +# MinIO Directory Handling Comparison + +## Overview + +This document compares how MinIO handles directory markers versus SeaweedFS's implementation, and explains the different approaches to S3 directory semantics. + +## MinIO's Approach + +MinIO handles implicit directories similarly to AWS S3: + +1. **No explicit directory objects**: Directories are implicit, defined only by object key prefixes +2. **HEAD on directory returns 404**: Consistent with AWS S3 behavior +3. **LIST operations reveal directories**: Directories are discovered through delimiter-based LIST operations +4. **Automatic prefix handling**: MinIO automatically recognizes prefixes as directories + +### MinIO Implementation Details + +- Uses in-memory metadata for fast prefix lookups +- Optimized for LIST operations with common delimiter (`/`) +- No persistent directory objects in storage layer +- Directories "exist" as long as they contain objects + +## SeaweedFS Approach + +SeaweedFS uses a filer-based approach with real directory entries: + +### Before the Fix + +1. **Explicit directory objects**: Could create 0-byte objects as directory markers +2. **HEAD returns 200**: Even for implicit directories +3. **Caused s3fs issues**: s3fs interpreted 0-byte HEAD responses as empty files + +### After the Fix + +1. **Hybrid approach**: Supports both explicit markers (with `/` suffix) and implicit directories +2. **HEAD returns 404 for implicit directories**: Matches AWS S3 and MinIO behavior +3. **Filer integration**: Uses filer's directory metadata to detect implicit directories +4. **s3fs compatibility**: Triggers proper LIST fallback behavior + +## Key Differences + +| Aspect | MinIO | SeaweedFS (After Fix) | +|--------|-------|----------------------| +| Directory Storage | No persistent objects | Filer directory entries | +| Implicit Directory HEAD | 404 Not Found | 404 Not Found | +| Explicit Marker HEAD | Not applicable | 200 OK (with `/` suffix) | +| Child Detection | Prefix scan | Filer LIST operation | +| Performance | In-memory lookups | Filer gRPC calls | + +## Implementation Considerations + +### Advantages of SeaweedFS Approach +- Integrates with existing filer metadata +- Supports both implicit and explicit directories +- Preserves directory metadata and attributes +- Compatible with POSIX filer semantics + +### Trade-offs +- Additional filer communication overhead for HEAD requests +- Complexity of supporting both directory paradigms +- Performance depends on filer efficiency + +## TODO + +- [ ] Add performance benchmark comparison: MinIO vs SeaweedFS +- [ ] Document edge cases where behaviors differ +- [ ] Add example request/response traces for both systems +- [ ] Document migration path for users moving from MinIO to SeaweedFS +- [ ] Add compatibility matrix for different S3 clients + diff --git a/test/s3/parquet/Makefile b/test/s3/parquet/Makefile new file mode 100644 index 000000000..dd65b6e9f --- /dev/null +++ b/test/s3/parquet/Makefile @@ -0,0 +1,365 @@ +# Makefile for S3 Parquet Integration Tests +# This Makefile provides targets for running comprehensive S3 Parquet tests with PyArrow + +# Default values +SEAWEEDFS_BINARY ?= weed +S3_PORT ?= 8333 +FILER_PORT ?= 8888 +VOLUME_PORT ?= 8080 +MASTER_PORT ?= 9333 +TEST_TIMEOUT ?= 15m +ACCESS_KEY ?= some_access_key1 +SECRET_KEY ?= some_secret_key1 +VOLUME_MAX_SIZE_MB ?= 50 +VOLUME_MAX_COUNT ?= 100 +BUCKET_NAME ?= test-parquet-bucket + +# Python configuration +PYTHON ?= python3 +VENV_DIR ?= .venv +PYTHON_TEST_SCRIPT ?= s3_parquet_test.py + +# Test directory +TEST_DIR := $(shell pwd) +SEAWEEDFS_ROOT := $(shell cd ../../../ && pwd) + +# Colors for output +RED := \033[0;31m +GREEN := \033[0;32m +YELLOW := \033[1;33m +NC := \033[0m # No Color + +.PHONY: all build-weed check-binary check-python ci-test clean debug-logs debug-status help manual-start manual-stop setup-python start-seaweedfs start-seaweedfs-ci stop-seaweedfs stop-seaweedfs-safe test test-implicit-dir test-implicit-dir-with-server test-quick test-with-server + +all: test + +# Build SeaweedFS binary (GitHub Actions compatible) +build-weed: + @echo "Building SeaweedFS binary..." + @cd $(SEAWEEDFS_ROOT)/weed && go install -buildvcs=false + @echo "ā
SeaweedFS binary built successfully" + +help: + @echo "SeaweedFS S3 Parquet Integration Tests" + @echo "" + @echo "Available targets:" + @echo " test - Run full S3 Parquet integration tests (small and large files)" + @echo " test-with-server - Run full tests with automatic server management (CI compatible)" + @echo " test-quick - Run quick tests with small files only (sets TEST_QUICK=1)" + @echo " test-implicit-dir - Test implicit directory fix for s3fs compatibility" + @echo " test-implicit-dir-with-server - Test implicit directory fix with server management" + @echo " setup-python - Setup Python virtual environment and install dependencies" + @echo " check-python - Check if Python and required packages are available" + @echo " start-seaweedfs - Start SeaweedFS server for testing" + @echo " start-seaweedfs-ci - Start SeaweedFS server (CI-safe version)" + @echo " stop-seaweedfs - Stop SeaweedFS server" + @echo " stop-seaweedfs-safe - Stop SeaweedFS server (CI-safe version)" + @echo " clean - Clean up test artifacts" + @echo " check-binary - Check if SeaweedFS binary exists" + @echo " build-weed - Build SeaweedFS binary" + @echo "" + @echo "Configuration:" + @echo " SEAWEEDFS_BINARY=$(SEAWEEDFS_BINARY)" + @echo " S3_PORT=$(S3_PORT)" + @echo " FILER_PORT=$(FILER_PORT)" + @echo " VOLUME_PORT=$(VOLUME_PORT)" + @echo " MASTER_PORT=$(MASTER_PORT)" + @echo " BUCKET_NAME=$(BUCKET_NAME)" + @echo " VOLUME_MAX_SIZE_MB=$(VOLUME_MAX_SIZE_MB)" + @echo " PYTHON=$(PYTHON)" + +check-binary: + @if ! command -v $(SEAWEEDFS_BINARY) > /dev/null 2>&1; then \ + echo "$(RED)Error: SeaweedFS binary '$(SEAWEEDFS_BINARY)' not found in PATH$(NC)"; \ + echo "Please build SeaweedFS first by running 'make' in the root directory"; \ + exit 1; \ + fi + @echo "$(GREEN)SeaweedFS binary found: $$(which $(SEAWEEDFS_BINARY))$(NC)" + +check-python: + @if ! command -v $(PYTHON) > /dev/null 2>&1; then \ + echo "$(RED)Error: Python '$(PYTHON)' not found$(NC)"; \ + echo "Please install Python 3.8 or later"; \ + exit 1; \ + fi + @echo "$(GREEN)Python found: $$(which $(PYTHON)) ($$($(PYTHON) --version))$(NC)" + +setup-python: check-python + @echo "$(YELLOW)Setting up Python virtual environment...$(NC)" + @if [ ! -d "$(VENV_DIR)" ]; then \ + $(PYTHON) -m venv $(VENV_DIR); \ + echo "$(GREEN)Virtual environment created$(NC)"; \ + fi + @echo "$(YELLOW)Installing Python dependencies...$(NC)" + @$(VENV_DIR)/bin/pip install --upgrade pip > /dev/null + @$(VENV_DIR)/bin/pip install -r requirements.txt + @echo "$(GREEN)Python dependencies installed successfully$(NC)" + +start-seaweedfs-ci: check-binary + @echo "$(YELLOW)Starting SeaweedFS server for Parquet testing...$(NC)" + + # Clean up any existing processes first (CI-safe) + @echo "Cleaning up any existing processes..." + @if command -v lsof >/dev/null 2>&1; then \ + lsof -ti :$(MASTER_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$(VOLUME_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$(FILER_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$(S3_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$$(( $(MASTER_PORT) + 10000 )) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$$(( $(VOLUME_PORT) + 10000 )) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$$(( $(FILER_PORT) + 10000 )) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + fi + @sleep 2 + + # Create necessary directories + @mkdir -p /tmp/seaweedfs-test-parquet-master + @mkdir -p /tmp/seaweedfs-test-parquet-volume + @mkdir -p /tmp/seaweedfs-test-parquet-filer + + # Clean up any old server logs + @rm -f /tmp/seaweedfs-parquet-*.log || true + + # Start master server with volume size limit and explicit gRPC port + @echo "Starting master server..." + @nohup $(SEAWEEDFS_BINARY) master -port=$(MASTER_PORT) -port.grpc=$$(( $(MASTER_PORT) + 10000 )) -mdir=/tmp/seaweedfs-test-parquet-master -volumeSizeLimitMB=$(VOLUME_MAX_SIZE_MB) -ip=127.0.0.1 -peers=none > /tmp/seaweedfs-parquet-master.log 2>&1 & + @sleep 3 + + # Start volume server with master HTTP port and increased capacity + @echo "Starting volume server..." + @nohup $(SEAWEEDFS_BINARY) volume -port=$(VOLUME_PORT) -mserver=127.0.0.1:$(MASTER_PORT) -dir=/tmp/seaweedfs-test-parquet-volume -max=$(VOLUME_MAX_COUNT) -ip=127.0.0.1 -preStopSeconds=1 > /tmp/seaweedfs-parquet-volume.log 2>&1 & + @sleep 5 + + # Start filer server with embedded S3 + @echo "Starting filer server with embedded S3..." + @printf '{"identities":[{"name":"%s","credentials":[{"accessKey":"%s","secretKey":"%s"}],"actions":["Admin","Read","Write"]}]}' "$(ACCESS_KEY)" "$(ACCESS_KEY)" "$(SECRET_KEY)" > /tmp/seaweedfs-parquet-s3.json + @AWS_ACCESS_KEY_ID=$(ACCESS_KEY) AWS_SECRET_ACCESS_KEY=$(SECRET_KEY) nohup $(SEAWEEDFS_BINARY) filer -port=$(FILER_PORT) -port.grpc=$$(( $(FILER_PORT) + 10000 )) -master=127.0.0.1:$(MASTER_PORT) -dataCenter=defaultDataCenter -ip=127.0.0.1 -s3 -s3.port=$(S3_PORT) -s3.config=/tmp/seaweedfs-parquet-s3.json > /tmp/seaweedfs-parquet-filer.log 2>&1 & + @sleep 5 + + # Wait for S3 service to be ready - use port-based checking for reliability + @echo "$(YELLOW)Waiting for S3 service to be ready...$(NC)" + @for i in $$(seq 1 20); do \ + if netstat -an 2>/dev/null | grep -q ":$(S3_PORT).*LISTEN" || \ + ss -an 2>/dev/null | grep -q ":$(S3_PORT).*LISTEN" || \ + lsof -i :$(S3_PORT) >/dev/null 2>&1; then \ + echo "$(GREEN)S3 service is listening on port $(S3_PORT)$(NC)"; \ + sleep 1; \ + break; \ + fi; \ + if [ $$i -eq 20 ]; then \ + echo "$(RED)S3 service failed to start within 20 seconds$(NC)"; \ + echo "=== Detailed Logs ==="; \ + echo "Master log:"; tail -30 /tmp/seaweedfs-parquet-master.log || true; \ + echo "Volume log:"; tail -30 /tmp/seaweedfs-parquet-volume.log || true; \ + echo "Filer log:"; tail -30 /tmp/seaweedfs-parquet-filer.log || true; \ + echo "=== Port Status ==="; \ + netstat -an 2>/dev/null | grep ":$(S3_PORT)" || \ + ss -an 2>/dev/null | grep ":$(S3_PORT)" || \ + echo "No port listening on $(S3_PORT)"; \ + exit 1; \ + fi; \ + echo "Waiting for S3 service... ($$i/20)"; \ + sleep 1; \ + done + + # Additional wait for filer gRPC to be ready + @echo "$(YELLOW)Waiting for filer gRPC to be ready...$(NC)" + @sleep 2 + + # Wait for volume server to register with master and ensure volume assignment works + @echo "$(YELLOW)Waiting for volume assignment to be ready...$(NC)" + @for i in $$(seq 1 30); do \ + ASSIGN_RESULT=$$(curl -s "http://localhost:$(MASTER_PORT)/dir/assign?count=1" 2>/dev/null); \ + if echo "$$ASSIGN_RESULT" | grep -q '"fid"'; then \ + echo "$(GREEN)Volume assignment is ready$(NC)"; \ + break; \ + fi; \ + if [ $$i -eq 30 ]; then \ + echo "$(RED)Volume assignment not ready after 30 seconds$(NC)"; \ + echo "=== Last assign attempt ==="; \ + echo "$$ASSIGN_RESULT"; \ + echo "=== Master Status ==="; \ + curl -s "http://localhost:$(MASTER_PORT)/dir/status" 2>/dev/null || echo "Failed to get master status"; \ + echo "=== Master Logs ==="; \ + tail -50 /tmp/seaweedfs-parquet-master.log 2>/dev/null || echo "No master log"; \ + echo "=== Volume Logs ==="; \ + tail -50 /tmp/seaweedfs-parquet-volume.log 2>/dev/null || echo "No volume log"; \ + exit 1; \ + fi; \ + echo "Waiting for volume assignment... ($$i/30)"; \ + sleep 1; \ + done + + @echo "$(GREEN)SeaweedFS server started successfully for Parquet testing$(NC)" + @echo "Master: http://localhost:$(MASTER_PORT)" + @echo "Volume: http://localhost:$(VOLUME_PORT)" + @echo "Filer: http://localhost:$(FILER_PORT)" + @echo "S3: http://localhost:$(S3_PORT)" + @echo "Volume Max Size: $(VOLUME_MAX_SIZE_MB)MB" + +start-seaweedfs: check-binary + @echo "$(YELLOW)Starting SeaweedFS server for Parquet testing...$(NC)" + @# Use port-based cleanup for consistency and safety + @echo "Cleaning up any existing processes..." + @lsof -ti :$(MASTER_PORT) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$(VOLUME_PORT) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$(FILER_PORT) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$(S3_PORT) 2>/dev/null | xargs -r kill -TERM || true + @# Clean up gRPC ports (HTTP port + 10000) + @lsof -ti :$$(( $(MASTER_PORT) + 10000 )) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$$(( $(VOLUME_PORT) + 10000 )) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$$(( $(FILER_PORT) + 10000 )) 2>/dev/null | xargs -r kill -TERM || true + @sleep 2 + @$(MAKE) start-seaweedfs-ci + +stop-seaweedfs: + @echo "$(YELLOW)Stopping SeaweedFS server...$(NC)" + @# Use port-based cleanup for consistency and safety + @lsof -ti :$(MASTER_PORT) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$(VOLUME_PORT) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$(FILER_PORT) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$(S3_PORT) 2>/dev/null | xargs -r kill -TERM || true + @# Clean up gRPC ports (HTTP port + 10000) + @lsof -ti :$$(( $(MASTER_PORT) + 10000 )) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$$(( $(VOLUME_PORT) + 10000 )) 2>/dev/null | xargs -r kill -TERM || true + @lsof -ti :$$(( $(FILER_PORT) + 10000 )) 2>/dev/null | xargs -r kill -TERM || true + @sleep 2 + @echo "$(GREEN)SeaweedFS server stopped$(NC)" + +# CI-safe server stop that's more conservative +stop-seaweedfs-safe: + @echo "$(YELLOW)Safely stopping SeaweedFS server...$(NC)" + @# Use port-based cleanup which is safer in CI + @if command -v lsof >/dev/null 2>&1; then \ + echo "Using lsof for port-based cleanup..."; \ + lsof -ti :$(MASTER_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$(VOLUME_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$(FILER_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$(S3_PORT) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$$(( $(MASTER_PORT) + 10000 )) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$$(( $(VOLUME_PORT) + 10000 )) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + lsof -ti :$$(( $(FILER_PORT) + 10000 )) 2>/dev/null | head -5 | while read pid; do kill -TERM $$pid 2>/dev/null || true; done; \ + else \ + echo "lsof not available, using netstat approach..."; \ + netstat -tlnp 2>/dev/null | grep :$(MASTER_PORT) | awk '{print $$7}' | cut -d/ -f1 | head -5 | while read pid; do [ "$$pid" != "-" ] && kill -TERM $$pid 2>/dev/null || true; done; \ + netstat -tlnp 2>/dev/null | grep :$(VOLUME_PORT) | awk '{print $$7}' | cut -d/ -f1 | head -5 | while read pid; do [ "$$pid" != "-" ] && kill -TERM $$pid 2>/dev/null || true; done; \ + netstat -tlnp 2>/dev/null | grep :$(FILER_PORT) | awk '{print $$7}' | cut -d/ -f1 | head -5 | while read pid; do [ "$$pid" != "-" ] && kill -TERM $$pid 2>/dev/null || true; done; \ + netstat -tlnp 2>/dev/null | grep :$(S3_PORT) | awk '{print $$7}' | cut -d/ -f1 | head -5 | while read pid; do [ "$$pid" != "-" ] && kill -TERM $$pid 2>/dev/null || true; done; \ + netstat -tlnp 2>/dev/null | grep :$$(( $(MASTER_PORT) + 10000 )) | awk '{print $$7}' | cut -d/ -f1 | head -5 | while read pid; do [ "$$pid" != "-" ] && kill -TERM $$pid 2>/dev/null || true; done; \ + netstat -tlnp 2>/dev/null | grep :$$(( $(VOLUME_PORT) + 10000 )) | awk '{print $$7}' | cut -d/ -f1 | head -5 | while read pid; do [ "$$pid" != "-" ] && kill -TERM $$pid 2>/dev/null || true; done; \ + netstat -tlnp 2>/dev/null | grep :$$(( $(FILER_PORT) + 10000 )) | awk '{print $$7}' | cut -d/ -f1 | head -5 | while read pid; do [ "$$pid" != "-" ] && kill -TERM $$pid 2>/dev/null || true; done; \ + fi + @sleep 2 + @echo "$(GREEN)SeaweedFS server safely stopped$(NC)" + +clean: + @echo "$(YELLOW)Cleaning up Parquet test artifacts...$(NC)" + @rm -rf /tmp/seaweedfs-test-parquet-* + @rm -f /tmp/seaweedfs-parquet-*.log + @rm -f /tmp/seaweedfs-parquet-s3.json + @rm -f s3_parquet_test_errors_*.log + @rm -rf $(VENV_DIR) + @echo "$(GREEN)Parquet test cleanup completed$(NC)" + +# Test with automatic server management (GitHub Actions compatible) +test-with-server: build-weed setup-python + @echo "š Starting Parquet integration tests with automated server management..." + @echo "Starting SeaweedFS cluster..." + @if $(MAKE) start-seaweedfs-ci > weed-test.log 2>&1; then \ + echo "ā
SeaweedFS cluster started successfully"; \ + echo "Running Parquet integration tests..."; \ + trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) $(PYTHON_TEST_SCRIPT) || exit 1; \ + echo "ā
All tests completed successfully"; \ + $(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true; \ + else \ + echo "ā Failed to start SeaweedFS cluster"; \ + echo "=== Server startup logs ==="; \ + tail -100 weed-test.log 2>/dev/null || echo "No startup log available"; \ + echo "=== System information ==="; \ + ps aux | grep -E "weed|make" | grep -v grep || echo "No relevant processes found"; \ + exit 1; \ + fi + +# Run tests assuming SeaweedFS is already running +test: setup-python + @echo "$(YELLOW)Running Parquet integration tests...$(NC)" + @echo "$(YELLOW)Assuming SeaweedFS is already running on localhost:$(S3_PORT)$(NC)" + @S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) $(PYTHON_TEST_SCRIPT) + +# Run quick tests with small files only +test-quick: setup-python + @echo "$(YELLOW)Running quick Parquet tests (small files only)...$(NC)" + @echo "$(YELLOW)Assuming SeaweedFS is already running on localhost:$(S3_PORT)$(NC)" + @S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + TEST_QUICK=1 \ + $(VENV_DIR)/bin/$(PYTHON) $(PYTHON_TEST_SCRIPT) + +# Test implicit directory fix for s3fs compatibility +test-implicit-dir: setup-python + @echo "$(YELLOW)Running implicit directory fix tests...$(NC)" + @echo "$(YELLOW)Assuming SeaweedFS is already running on localhost:$(S3_PORT)$(NC)" + @S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=test-implicit-dir \ + $(VENV_DIR)/bin/$(PYTHON) test_implicit_directory_fix.py + +# Test implicit directory fix with automatic server management +test-implicit-dir-with-server: build-weed setup-python + @echo "š Starting implicit directory fix tests with automated server management..." + @echo "Starting SeaweedFS cluster..." + @if $(MAKE) start-seaweedfs-ci > weed-test.log 2>&1; then \ + echo "ā
SeaweedFS cluster started successfully"; \ + echo "Running implicit directory fix tests..."; \ + trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=test-implicit-dir \ + $(VENV_DIR)/bin/$(PYTHON) test_implicit_directory_fix.py || exit 1; \ + echo "ā
All tests completed successfully"; \ + $(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true; \ + else \ + echo "ā Failed to start SeaweedFS cluster"; \ + echo "=== Server startup logs ==="; \ + tail -100 weed-test.log 2>/dev/null || echo "No startup log available"; \ + exit 1; \ + fi + +# Debug targets +debug-logs: + @echo "$(YELLOW)=== Master Log ===$(NC)" + @tail -n 50 /tmp/seaweedfs-parquet-master.log || echo "No master log found" + @echo "$(YELLOW)=== Volume Log ===$(NC)" + @tail -n 50 /tmp/seaweedfs-parquet-volume.log || echo "No volume log found" + @echo "$(YELLOW)=== Filer Log ===$(NC)" + @tail -n 50 /tmp/seaweedfs-parquet-filer.log || echo "No filer log found" + +debug-status: + @echo "$(YELLOW)=== Process Status ===$(NC)" + @ps aux | grep -E "(weed|seaweedfs)" | grep -v grep || echo "No SeaweedFS processes found" + @echo "$(YELLOW)=== Port Status ===$(NC)" + @netstat -an | grep -E "($(MASTER_PORT)|$(VOLUME_PORT)|$(FILER_PORT)|$(S3_PORT))" || echo "No ports in use" + +# Manual test targets for development +manual-start: start-seaweedfs + @echo "$(GREEN)SeaweedFS with S3 is now running for manual testing$(NC)" + @echo "You can now run Parquet tests manually" + @echo "Run 'make manual-stop' when finished" + +manual-stop: stop-seaweedfs clean + +# CI/CD targets +ci-test: test-with-server + diff --git a/test/s3/parquet/README.md b/test/s3/parquet/README.md new file mode 100644 index 000000000..48ce3e6fc --- /dev/null +++ b/test/s3/parquet/README.md @@ -0,0 +1,206 @@ +# PyArrow Parquet S3 Compatibility Tests + +This directory contains tests for PyArrow Parquet compatibility with SeaweedFS S3 API, including the implicit directory detection fix. + +## Overview + +**Status**: ā
**All PyArrow methods work correctly with SeaweedFS** + +SeaweedFS implements implicit directory detection to improve compatibility with s3fs and PyArrow. When PyArrow writes datasets using `write_dataset()`, it may create directory markers that can confuse s3fs. SeaweedFS now handles these correctly by returning 404 for HEAD requests on implicit directories (directories with children), forcing s3fs to use LIST-based discovery. + +## Quick Start + +### Running Tests + +```bash +# Setup Python environment +make setup-python + +# Run all tests with server (small and large files) +make test-with-server + +# Run quick tests with small files only (faster for development) +make test-quick + +# Run implicit directory fix tests +make test-implicit-dir-with-server + +# Clean up +make clean +``` + +### Using PyArrow with SeaweedFS + +```python +import pyarrow as pa +import pyarrow.parquet as pq +import pyarrow.dataset as pads +import s3fs + +# Configure s3fs +fs = s3fs.S3FileSystem( + key='your_access_key', + secret='your_secret_key', + endpoint_url='http://localhost:8333', + use_ssl=False +) + +# Write dataset (creates directory structure) +table = pa.table({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) +pads.write_dataset(table, 'bucket/dataset', filesystem=fs) + +# Read dataset (all methods work!) +dataset = pads.dataset('bucket/dataset', filesystem=fs) # ā
+table = pq.read_table('bucket/dataset', filesystem=fs) # ā
+dataset = pq.ParquetDataset('bucket/dataset', filesystem=fs) # ā
+``` + +## Test Files + +### Main Test Suite +- **`s3_parquet_test.py`** - Comprehensive PyArrow test suite + - Tests 2 write methods Ć 5 read methods Ć 2 dataset sizes = 20 combinations + - All tests pass with the implicit directory fix ā
+ +### Implicit Directory Tests +- **`test_implicit_directory_fix.py`** - Specific tests for the implicit directory fix + - Tests HEAD request behavior + - Tests s3fs directory detection + - Tests PyArrow dataset reading + - All 6 tests pass ā
+ +### Configuration +- **`Makefile`** - Build and test automation +- **`requirements.txt`** - Python dependencies (pyarrow, s3fs, boto3) +- **`.gitignore`** - Ignore patterns for test artifacts + +## Documentation + +### Technical Documentation +- **`TEST_COVERAGE.md`** - Comprehensive test coverage documentation + - Unit tests (Go): 17 test cases + - Integration tests (Python): 6 test cases + - End-to-end tests (Python): 20 test cases + +- **`FINAL_ROOT_CAUSE_ANALYSIS.md`** - Deep technical analysis + - Root cause of the s3fs compatibility issue + - How the implicit directory fix works + - Performance considerations + +- **`MINIO_DIRECTORY_HANDLING.md`** - Comparison with MinIO + - How MinIO handles directory markers + - Differences in implementation approaches + +## The Implicit Directory Fix + +### Problem +When PyArrow writes datasets with `write_dataset()`, it may create 0-byte directory markers. s3fs's `info()` method calls HEAD on these paths, and if HEAD returns 200 with size=0, s3fs incorrectly reports them as files instead of directories. This causes PyArrow to fail with "Parquet file size is 0 bytes". + +### Solution +SeaweedFS now returns 404 for HEAD requests on implicit directories (0-byte objects or directories with children, when requested without a trailing slash). This forces s3fs to fall back to LIST-based discovery, which correctly identifies directories by checking for children. + +### Implementation +The fix is implemented in `weed/s3api/s3api_object_handlers.go`: +- `HeadObjectHandler` - Returns 404 for implicit directories +- `hasChildren` - Helper function to check if a path has children + +See the source code for detailed inline documentation. + +### Test Coverage +- **Unit tests** (Go): `weed/s3api/s3api_implicit_directory_test.go` + - Run: `cd weed/s3api && go test -v -run TestImplicitDirectory` + +- **Integration tests** (Python): `test_implicit_directory_fix.py` + - Run: `cd test/s3/parquet && make test-implicit-dir-with-server` + +- **End-to-end tests** (Python): `s3_parquet_test.py` + - Run: `cd test/s3/parquet && make test-with-server` + +## Makefile Targets + +```bash +# Setup +make setup-python # Create Python virtual environment and install dependencies +make build-weed # Build SeaweedFS binary + +# Testing +make test # Run full tests (assumes server is already running) +make test-with-server # Run full PyArrow test suite with server (small + large files) +make test-quick # Run quick tests with small files only (assumes server is running) +make test-implicit-dir-with-server # Run implicit directory tests with server + +# Server Management +make start-seaweedfs-ci # Start SeaweedFS in background (CI mode) +make stop-seaweedfs-safe # Stop SeaweedFS gracefully +make clean # Clean up all test artifacts + +# Development +make help # Show all available targets +``` + +## Continuous Integration + +The tests are automatically run in GitHub Actions on every push/PR that affects S3 or filer code: + +**Workflow**: `.github/workflows/s3-parquet-tests.yml` + +**Test Matrix**: +- Python versions: 3.9, 3.11, 3.12 +- PyArrow integration tests: 20 test combinations +- Implicit directory fix tests: 6 test scenarios +- Go unit tests: 17 test cases + +**Triggers**: +- Push/PR to master (when `weed/s3api/**` or `weed/filer/**` changes) +- Manual trigger via GitHub UI (workflow_dispatch) + +## Requirements + +- Python 3.8+ +- PyArrow 22.0.0+ +- s3fs 2024.12.0+ +- boto3 1.40.0+ +- SeaweedFS (latest) + +## AWS S3 Compatibility + +The implicit directory fix makes SeaweedFS behavior more compatible with AWS S3: +- AWS S3 typically doesn't create directory markers for implicit directories +- HEAD on "dataset" (when only "dataset/file.txt" exists) returns 404 on AWS +- SeaweedFS now matches this behavior for implicit directories with children + +## Edge Cases Handled + +ā
**Implicit directories with children** ā 404 (forces LIST-based discovery) +ā
**Empty files (0-byte, no children)** ā 200 (legitimate empty file) +ā
**Empty directories (no children)** ā 200 (legitimate empty directory) +ā
**Explicit directory requests (trailing slash)** ā 200 (normal directory behavior) +ā
**Versioned buckets** ā Skip implicit directory check (versioned semantics) +ā
**Regular files** ā 200 (normal file behavior) + +## Performance + +The implicit directory check adds minimal overhead: +- Only triggered for 0-byte objects or directories without trailing slash +- Cost: One LIST operation with Limit=1 (~1-5ms) +- No impact on regular file operations + +## Contributing + +When adding new tests: +1. Add test cases to the appropriate test file +2. Update TEST_COVERAGE.md +3. Run the full test suite to ensure no regressions +4. Update this README if adding new functionality + +## References + +- [PyArrow Documentation](https://arrow.apache.org/docs/python/parquet.html) +- [s3fs Documentation](https://s3fs.readthedocs.io/) +- [SeaweedFS S3 API](https://github.com/seaweedfs/seaweedfs/wiki/Amazon-S3-API) +- [AWS S3 API Reference](https://docs.aws.amazon.com/AmazonS3/latest/API/) + +--- + +**Last Updated**: November 19, 2025 +**Status**: All tests passing ā
diff --git a/test/s3/parquet/TEST_COVERAGE.md b/test/s3/parquet/TEST_COVERAGE.md new file mode 100644 index 000000000..f08a93ab9 --- /dev/null +++ b/test/s3/parquet/TEST_COVERAGE.md @@ -0,0 +1,46 @@ +# Test Coverage Documentation + +## Overview + +This document provides comprehensive test coverage documentation for the SeaweedFS S3 Parquet integration tests. + +## Test Categories + +### Unit Tests (Go) +- 17 test cases covering S3 API handlers +- Tests for implicit directory handling +- HEAD request behavior validation +- Located in: `weed/s3api/s3api_implicit_directory_test.go` + +### Integration Tests (Python) +- 6 test cases for implicit directory fix +- Tests HEAD request behavior on directory markers +- s3fs directory detection validation +- PyArrow dataset read compatibility +- Located in: `test_implicit_directory_fix.py` + +### End-to-End Tests (Python) +- 20 test cases combining write and read methods +- Small file tests (5 rows): 10 test combinations +- Large file tests (200,000 rows): 10 test combinations +- Tests multiple write methods: `pads.write_dataset`, `pq.write_table+s3fs` +- Tests multiple read methods: `pads.dataset`, `pq.ParquetDataset`, `pq.read_table`, `s3fs+direct`, `s3fs+buffered` +- Located in: `s3_parquet_test.py` + +## Coverage Summary + +| Test Type | Count | Status | +|-----------|-------|--------| +| Unit Tests (Go) | 17 | ā
Pass | +| Integration Tests (Python) | 6 | ā
Pass | +| End-to-End Tests (Python) | 20 | ā
Pass | +| **Total** | **43** | **ā
All Pass** | + +## TODO + +- [ ] Add detailed test execution time metrics +- [ ] Document test data generation strategies +- [ ] Add code coverage percentages for Go tests +- [ ] Document edge cases and corner cases tested +- [ ] Add performance benchmarking results + diff --git a/test/s3/parquet/requirements.txt b/test/s3/parquet/requirements.txt new file mode 100644 index 000000000..e92a7cd70 --- /dev/null +++ b/test/s3/parquet/requirements.txt @@ -0,0 +1,7 @@ +# Python dependencies for S3 Parquet tests +# Install with: pip install -r requirements.txt + +pyarrow>=10.0.0 +s3fs>=2023.12.0 +boto3>=1.28.0 + diff --git a/test/s3/parquet/s3_parquet_test.py b/test/s3/parquet/s3_parquet_test.py new file mode 100755 index 000000000..35ff0bcde --- /dev/null +++ b/test/s3/parquet/s3_parquet_test.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 +""" +Test script for S3-compatible storage with PyArrow Parquet files. + +This script tests different write methods (PyArrow write_dataset vs. pq.write_table to buffer) +combined with different read methods (PyArrow dataset, direct s3fs read, buffered read) to +identify which combinations work with large files that span multiple row groups. + +This test specifically addresses issues with large tables using PyArrow where files span +multiple row-groups (default row_group size is around 130,000 rows). + +Requirements: + - pyarrow>=22 + - s3fs>=2024.12.0 + +Environment Variables: + S3_ENDPOINT_URL: S3 endpoint (default: http://localhost:8333) + S3_ACCESS_KEY: S3 access key (default: some_access_key1) + S3_SECRET_KEY: S3 secret key (default: some_secret_key1) + BUCKET_NAME: S3 bucket name (default: test-parquet-bucket) + TEST_QUICK: Run only small/quick tests (default: 0, set to 1 for quick mode) + +Usage: + # Run with default environment variables + python3 s3_parquet_test.py + + # Run with custom environment variables + S3_ENDPOINT_URL=http://localhost:8333 \ + S3_ACCESS_KEY=mykey \ + S3_SECRET_KEY=mysecret \ + BUCKET_NAME=mybucket \ + python3 s3_parquet_test.py +""" + +import io +import logging +import os +import secrets +import sys +import traceback +from datetime import datetime +from typing import Tuple + +import pyarrow as pa +import pyarrow.dataset as pads +import pyarrow.parquet as pq + +try: + import s3fs +except ImportError: + logging.error("s3fs not installed. Install with: pip install s3fs") + sys.exit(1) + +logging.basicConfig(level=logging.INFO, format="%(message)s") + +# Error log file +ERROR_LOG_FILE = f"s3_parquet_test_errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + +# Configuration from environment variables with defaults +S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL", "http://localhost:8333") +S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "some_access_key1") +S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "some_secret_key1") +BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket") +TEST_QUICK = os.getenv("TEST_QUICK", "0") == "1" + +# Create randomized test directory +TEST_RUN_ID = secrets.token_hex(8) +TEST_DIR = f"{BUCKET_NAME}/parquet-tests/{TEST_RUN_ID}" + +# Test file sizes +TEST_SIZES = { + "small": 5, + "large": 200_000, # This will create multiple row groups +} + +# Filter to only small tests if quick mode is enabled +if TEST_QUICK: + TEST_SIZES = {"small": TEST_SIZES["small"]} + logging.info("Quick test mode enabled - running only small tests") + + +def create_sample_table(num_rows: int = 5) -> pa.Table: + """Create a sample PyArrow table for testing.""" + return pa.table({ + "id": pa.array(range(num_rows), type=pa.int64()), + "name": pa.array([f"user_{i}" for i in range(num_rows)], type=pa.string()), + "value": pa.array([float(i) * 1.5 for i in range(num_rows)], type=pa.float64()), + "flag": pa.array([i % 2 == 0 for i in range(num_rows)], type=pa.bool_()), + }) + + +def log_error(operation: str, short_msg: str) -> None: + """Log error details to file with full traceback.""" + with open(ERROR_LOG_FILE, "a") as f: + f.write(f"\n{'='*80}\n") + f.write(f"Operation: {operation}\n") + f.write(f"Time: {datetime.now().isoformat()}\n") + f.write(f"Message: {short_msg}\n") + f.write("Full Traceback:\n") + f.write(traceback.format_exc()) + f.write(f"{'='*80}\n") + + +def init_s3fs() -> s3fs.S3FileSystem: + """Initialize and return S3FileSystem.""" + logging.info("Initializing S3FileSystem...") + logging.info(f" Endpoint: {S3_ENDPOINT_URL}") + logging.info(f" Bucket: {BUCKET_NAME}") + try: + fs = s3fs.S3FileSystem( + client_kwargs={"endpoint_url": S3_ENDPOINT_URL}, + key=S3_ACCESS_KEY, + secret=S3_SECRET_KEY, + use_listings_cache=False, + ) + logging.info("ā S3FileSystem initialized successfully\n") + return fs + except Exception: + logging.exception("ā Failed to initialize S3FileSystem") + raise + + +def ensure_bucket_exists(fs: s3fs.S3FileSystem) -> None: + """Ensure the test bucket exists.""" + try: + if not fs.exists(BUCKET_NAME): + logging.info(f"Creating bucket: {BUCKET_NAME}") + fs.mkdir(BUCKET_NAME) + logging.info(f"ā Bucket created: {BUCKET_NAME}") + else: + logging.info(f"ā Bucket exists: {BUCKET_NAME}") + except Exception: + logging.exception("ā Failed to create/check bucket") + raise + + +# Write Methods + +def write_with_pads(table: pa.Table, path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str]: + """Write using pads.write_dataset with filesystem parameter.""" + try: + pads.write_dataset(table, path, format="parquet", filesystem=fs) + return True, "pads.write_dataset" + except Exception as e: + error_msg = f"pads.write_dataset: {type(e).__name__}" + log_error("write_with_pads", error_msg) + return False, error_msg + + +def write_with_buffer_and_s3fs(table: pa.Table, path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str]: + """Write using pq.write_table to buffer, then upload via s3fs.""" + try: + buffer = io.BytesIO() + pq.write_table(table, buffer) + buffer.seek(0) + with fs.open(path, "wb") as f: + f.write(buffer.read()) + return True, "pq.write_table+s3fs.open" + except Exception as e: + error_msg = f"pq.write_table+s3fs.open: {type(e).__name__}" + log_error("write_with_buffer_and_s3fs", error_msg) + return False, error_msg + + +# Read Methods + +def get_parquet_files(path: str, fs: s3fs.S3FileSystem) -> list: + """ + Helper to discover all parquet files for a given path. + + Args: + path: S3 path (file or directory) + fs: S3FileSystem instance + + Returns: + List of parquet file paths + + Raises: + ValueError: If no parquet files are found in a directory + """ + if fs.isdir(path): + # Find all parquet files in the directory + files = [f for f in fs.ls(path) if f.endswith('.parquet')] + if not files: + raise ValueError(f"No parquet files found in directory: {path}") + return files + else: + # Single file path + return [path] + + +def read_with_pads_dataset(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: + """Read using pads.dataset - handles both single files and directories.""" + try: + # pads.dataset() should auto-discover parquet files in the directory + dataset = pads.dataset(path, format="parquet", filesystem=fs) + result = dataset.to_table() + return True, "pads.dataset", result.num_rows + except Exception as e: + error_msg = f"pads.dataset: {type(e).__name__}" + log_error("read_with_pads_dataset", error_msg) + return False, error_msg, 0 + + +def read_direct_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: + """Read directly via s3fs.open() streaming.""" + try: + # Get all parquet files (handles both single file and directory) + parquet_files = get_parquet_files(path, fs) + + # Read all parquet files and concatenate them + tables = [] + for file_path in parquet_files: + with fs.open(file_path, "rb") as f: + table = pq.read_table(f) + tables.append(table) + + # Concatenate all tables into one + if len(tables) == 1: + result = tables[0] + else: + result = pa.concat_tables(tables) + + return True, "s3fs.open+pq.read_table", result.num_rows + except Exception as e: + error_msg = f"s3fs.open+pq.read_table: {type(e).__name__}" + log_error("read_direct_s3fs", error_msg) + return False, error_msg, 0 + + +def read_buffered_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: + """Read via s3fs.open() into buffer, then pq.read_table.""" + try: + # Get all parquet files (handles both single file and directory) + parquet_files = get_parquet_files(path, fs) + + # Read all parquet files and concatenate them + tables = [] + for file_path in parquet_files: + with fs.open(file_path, "rb") as f: + buffer = io.BytesIO(f.read()) + buffer.seek(0) + table = pq.read_table(buffer) + tables.append(table) + + # Concatenate all tables into one + if len(tables) == 1: + result = tables[0] + else: + result = pa.concat_tables(tables) + + return True, "s3fs.open+BytesIO+pq.read_table", result.num_rows + except Exception as e: + error_msg = f"s3fs.open+BytesIO+pq.read_table: {type(e).__name__}" + log_error("read_buffered_s3fs", error_msg) + return False, error_msg, 0 + + +def read_with_parquet_dataset(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: + """Read using pq.ParquetDataset - designed for directories.""" + try: + # ParquetDataset is specifically designed to handle directories + dataset = pq.ParquetDataset(path, filesystem=fs) + result = dataset.read() + return True, "pq.ParquetDataset", result.num_rows + except Exception as e: + error_msg = f"pq.ParquetDataset: {type(e).__name__}" + log_error("read_with_parquet_dataset", error_msg) + return False, error_msg, 0 + + +def read_with_pq_read_table(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: + """Read using pq.read_table with filesystem parameter.""" + try: + # pq.read_table() with filesystem should handle directories + result = pq.read_table(path, filesystem=fs) + return True, "pq.read_table+filesystem", result.num_rows + except Exception as e: + error_msg = f"pq.read_table+filesystem: {type(e).__name__}" + log_error("read_with_pq_read_table", error_msg) + return False, error_msg, 0 + + +def test_combination( + fs: s3fs.S3FileSystem, + test_name: str, + write_func, + read_func, + num_rows: int, +) -> Tuple[bool, str]: + """Test a specific write/read combination.""" + table = create_sample_table(num_rows=num_rows) + path = f"{TEST_DIR}/{test_name}/data.parquet" + + # Write + write_ok, write_msg = write_func(table, path, fs) + if not write_ok: + return False, f"WRITE_FAIL: {write_msg}" + + # Read + read_ok, read_msg, rows_read = read_func(path, fs) + if not read_ok: + return False, f"READ_FAIL: {read_msg}" + + # Verify + if rows_read != num_rows: + return False, f"DATA_MISMATCH: expected {num_rows}, got {rows_read}" + + return True, f"{write_msg} + {read_msg}" + + +def cleanup_test_files(fs: s3fs.S3FileSystem) -> None: + """Clean up test files from S3.""" + try: + if fs.exists(TEST_DIR): + logging.info(f"Cleaning up test directory: {TEST_DIR}") + fs.rm(TEST_DIR, recursive=True) + logging.info("ā Test directory cleaned up") + except Exception as e: + logging.warning(f"Failed to cleanup test directory: {e}") + + +def main(): + """Run all write/read method combinations.""" + print("=" * 80) + print("Write/Read Method Combination Tests for S3-Compatible Storage") + print("Testing PyArrow Parquet Files with Multiple Row Groups") + if TEST_QUICK: + print("*** QUICK TEST MODE - Small files only ***") + print("=" * 80 + "\n") + + print("Configuration:") + print(f" S3 Endpoint: {S3_ENDPOINT_URL}") + print(f" Bucket: {BUCKET_NAME}") + print(f" Test Directory: {TEST_DIR}") + print(f" Quick Mode: {'Yes (small files only)' if TEST_QUICK else 'No (all file sizes)'}") + print() + + try: + fs = init_s3fs() + ensure_bucket_exists(fs) + except Exception as e: + print(f"Cannot proceed without S3 connection: {e}") + return 1 + + # Define all write methods + write_methods = [ + ("pads", write_with_pads), + ("buffer+s3fs", write_with_buffer_and_s3fs), + ] + + # Define all read methods + read_methods = [ + ("pads.dataset", read_with_pads_dataset), + ("pq.ParquetDataset", read_with_parquet_dataset), + ("pq.read_table", read_with_pq_read_table), + ("s3fs+direct", read_direct_s3fs), + ("s3fs+buffered", read_buffered_s3fs), + ] + + results = [] + + # Test all combinations for each file size + for size_name, num_rows in TEST_SIZES.items(): + print(f"\n{'='*80}") + print(f"Testing with {size_name} files ({num_rows:,} rows)") + print(f"{'='*80}\n") + print(f"{'Write Method':<20} | {'Read Method':<20} | {'Result':<40}") + print("-" * 85) + + for write_name, write_func in write_methods: + for read_name, read_func in read_methods: + test_name = f"{size_name}_{write_name}_{read_name}" + success, message = test_combination( + fs, test_name, write_func, read_func, num_rows + ) + results.append((test_name, success, message)) + status = "ā PASS" if success else "ā FAIL" + print(f"{write_name:<20} | {read_name:<20} | {status}: {message[:35]}") + + # Summary + print("\n" + "=" * 80) + print("SUMMARY") + print("=" * 80) + passed = sum(1 for _, success, _ in results if success) + total = len(results) + print(f"\nTotal: {passed}/{total} passed\n") + + # Group results by file size + for size_name in TEST_SIZES.keys(): + size_results = [r for r in results if size_name in r[0]] + size_passed = sum(1 for _, success, _ in size_results if success) + print(f"{size_name.upper()}: {size_passed}/{len(size_results)} passed") + + print("\n" + "=" * 80) + if passed == total: + print("ā ALL TESTS PASSED!") + else: + print(f"ā {total - passed} test(s) failed") + print("\nFailing combinations:") + for name, success, message in results: + if not success: + parts = name.split("_") + size = parts[0] + write = parts[1] + read = "_".join(parts[2:]) + print(f" - {size:6} | {write:15} | {read:20} -> {message[:50]}") + + print("=" * 80 + "\n") + print(f"Error details logged to: {ERROR_LOG_FILE}") + print("=" * 80 + "\n") + + # Cleanup + cleanup_test_files(fs) + + return 0 if passed == total else 1 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/test/s3/parquet/test_implicit_directory_fix.py b/test/s3/parquet/test_implicit_directory_fix.py new file mode 100755 index 000000000..9ac8f0346 --- /dev/null +++ b/test/s3/parquet/test_implicit_directory_fix.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +""" +Test script to verify the implicit directory fix for s3fs compatibility. + +This test verifies that: +1. Implicit directory markers (0-byte objects with children) return 404 on HEAD +2. s3fs correctly identifies them as directories via LIST fallback +3. PyArrow can read datasets created with write_dataset() + +The fix makes SeaweedFS behave like AWS S3 and improves s3fs compatibility. +""" + +import io +import logging +import os +import sys +import traceback + +import pyarrow as pa +import pyarrow.dataset as pads +import pyarrow.parquet as pq +import s3fs +import boto3 +from botocore.exceptions import ClientError + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Configuration +S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL", "http://localhost:8333") +S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "some_access_key1") +S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "some_secret_key1") +BUCKET_NAME = os.getenv("BUCKET_NAME", "test-implicit-dir") + +def create_sample_table(num_rows: int = 1000) -> pa.Table: + """Create a sample PyArrow table.""" + return pa.table({ + 'id': pa.array(range(num_rows), type=pa.int64()), + 'value': pa.array([f'value_{i}' for i in range(num_rows)], type=pa.string()), + 'score': pa.array([float(i) * 1.5 for i in range(num_rows)], type=pa.float64()), + }) + +def setup_s3(): + """Set up S3 clients.""" + # s3fs client + fs = s3fs.S3FileSystem( + key=S3_ACCESS_KEY, + secret=S3_SECRET_KEY, + client_kwargs={'endpoint_url': S3_ENDPOINT_URL}, + use_ssl=False + ) + + # boto3 client for raw S3 operations + s3_client = boto3.client( + 's3', + endpoint_url=S3_ENDPOINT_URL, + aws_access_key_id=S3_ACCESS_KEY, + aws_secret_access_key=S3_SECRET_KEY, + use_ssl=False + ) + + return fs, s3_client + +def test_implicit_directory_head_behavior(fs, s3_client): + """Test that HEAD on implicit directory markers returns 404.""" + logger.info("\n" + "="*80) + logger.info("TEST 1: Implicit Directory HEAD Behavior") + logger.info("="*80) + + test_path = f"{BUCKET_NAME}/test_implicit_dir" + + # Clean up any existing data + try: + fs.rm(test_path, recursive=True) + except: + pass + + # Create a dataset using PyArrow (creates implicit directory) + logger.info(f"Creating dataset at: {test_path}") + table = create_sample_table(1000) + pads.write_dataset(table, test_path, filesystem=fs, format='parquet') + + # List what was created + logger.info("\nFiles created:") + files = fs.ls(test_path, detail=True) + for f in files: + logger.info(f" {f['name']} - size: {f['size']} bytes, type: {f['type']}") + + # Test HEAD request on the directory marker (without trailing slash) + logger.info(f"\nTesting HEAD on: {test_path}") + try: + response = s3_client.head_object(Bucket=BUCKET_NAME, Key='test_implicit_dir') + logger.info(f" HEAD response: {response['ResponseMetadata']['HTTPStatusCode']}") + logger.info(f" Content-Length: {response.get('ContentLength', 'N/A')}") + logger.info(f" Content-Type: {response.get('ContentType', 'N/A')}") + logger.warning(" ā ļø Expected 404, but got 200 - fix may not be working") + return False + except ClientError as e: + if e.response['Error']['Code'] == '404': + logger.info(" ā HEAD returned 404 (expected - implicit directory)") + return True + else: + logger.error(f" ā Unexpected error: {e}") + return False + +def test_s3fs_directory_detection(fs): + """Test that s3fs correctly detects the directory.""" + logger.info("\n" + "="*80) + logger.info("TEST 2: s3fs Directory Detection") + logger.info("="*80) + + test_path = f"{BUCKET_NAME}/test_implicit_dir" + + # Test s3fs.info() + logger.info(f"\nTesting s3fs.info('{test_path}'):") + try: + info = fs.info(test_path) + logger.info(f" Type: {info.get('type', 'N/A')}") + logger.info(f" Size: {info.get('size', 'N/A')}") + + if info.get('type') == 'directory': + logger.info(" ā s3fs correctly identified as directory") + return True + else: + logger.warning(f" ā ļø s3fs identified as: {info.get('type')}") + return False + except Exception as e: + logger.error(f" ā Error: {e}") + return False + +def test_s3fs_isdir(fs): + """Test that s3fs.isdir() works correctly.""" + logger.info("\n" + "="*80) + logger.info("TEST 3: s3fs.isdir() Method") + logger.info("="*80) + + test_path = f"{BUCKET_NAME}/test_implicit_dir" + + logger.info(f"\nTesting s3fs.isdir('{test_path}'):") + try: + is_dir = fs.isdir(test_path) + logger.info(f" Result: {is_dir}") + + if is_dir: + logger.info(" ā s3fs.isdir() correctly returned True") + return True + else: + logger.warning(" ā ļø s3fs.isdir() returned False") + return False + except Exception as e: + logger.error(f" ā Error: {e}") + return False + +def test_pyarrow_dataset_read(fs): + """Test that PyArrow can read the dataset.""" + logger.info("\n" + "="*80) + logger.info("TEST 4: PyArrow Dataset Read") + logger.info("="*80) + + test_path = f"{BUCKET_NAME}/test_implicit_dir" + + logger.info(f"\nReading dataset from: {test_path}") + try: + ds = pads.dataset(test_path, filesystem=fs, format='parquet') + table = ds.to_table() + logger.info(f" ā Successfully read {len(table)} rows") + logger.info(f" Columns: {table.column_names}") + return True + except Exception as e: + logger.error(f" ā Failed to read dataset: {e}") + traceback.print_exc() + return False + +def test_explicit_directory_marker(fs, s3_client): + """Test that explicit directory markers (with trailing slash) still work.""" + logger.info("\n" + "="*80) + logger.info("TEST 5: Explicit Directory Marker (with trailing slash)") + logger.info("="*80) + + # Create an explicit directory marker + logger.info(f"\nCreating explicit directory: {BUCKET_NAME}/explicit_dir/") + try: + s3_client.put_object( + Bucket=BUCKET_NAME, + Key='explicit_dir/', + Body=b'', + ContentType='httpd/unix-directory' + ) + logger.info(" ā Created explicit directory marker") + except Exception as e: + logger.error(f" ā Failed to create: {e}") + return False + + # Test HEAD with trailing slash + logger.info(f"\nTesting HEAD on: {BUCKET_NAME}/explicit_dir/") + try: + response = s3_client.head_object(Bucket=BUCKET_NAME, Key='explicit_dir/') + logger.info(f" ā HEAD returned 200 (expected for explicit directory)") + logger.info(f" Content-Type: {response.get('ContentType', 'N/A')}") + return True + except ClientError as e: + logger.error(f" ā HEAD failed: {e}") + return False + +def test_empty_file_not_directory(fs, s3_client): + """Test that legitimate empty files are not treated as directories.""" + logger.info("\n" + "="*80) + logger.info("TEST 6: Empty File (not a directory)") + logger.info("="*80) + + # Create an empty file with text/plain mime type + logger.info(f"\nCreating empty file: {BUCKET_NAME}/empty.txt") + try: + s3_client.put_object( + Bucket=BUCKET_NAME, + Key='empty.txt', + Body=b'', + ContentType='text/plain' + ) + logger.info(" ā Created empty file") + except Exception as e: + logger.error(f" ā Failed to create: {e}") + return False + + # Test HEAD + logger.info(f"\nTesting HEAD on: {BUCKET_NAME}/empty.txt") + try: + response = s3_client.head_object(Bucket=BUCKET_NAME, Key='empty.txt') + logger.info(f" ā HEAD returned 200 (expected for empty file)") + logger.info(f" Content-Type: {response.get('ContentType', 'N/A')}") + + # Verify s3fs doesn't think it's a directory + info = fs.info(f"{BUCKET_NAME}/empty.txt") + if info.get('type') == 'file': + logger.info(" ā s3fs correctly identified as file") + return True + else: + logger.warning(f" ā ļø s3fs identified as: {info.get('type')}") + return False + except Exception as e: + logger.error(f" ā Error: {e}") + return False + +def main(): + """Run all tests.""" + logger.info("="*80) + logger.info("Implicit Directory Fix Test Suite") + logger.info("="*80) + logger.info(f"Endpoint: {S3_ENDPOINT_URL}") + logger.info(f"Bucket: {BUCKET_NAME}") + logger.info("="*80) + + # Set up S3 clients + fs, s3_client = setup_s3() + + # Create bucket if it doesn't exist + try: + s3_client.create_bucket(Bucket=BUCKET_NAME) + logger.info(f"\nā Created bucket: {BUCKET_NAME}") + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code in ['BucketAlreadyOwnedByYou', 'BucketAlreadyExists']: + logger.info(f"\nā Bucket already exists: {BUCKET_NAME}") + else: + logger.error(f"\nā Failed to create bucket: {e}") + return 1 + + # Run tests + results = [] + + results.append(("Implicit Directory HEAD", test_implicit_directory_head_behavior(fs, s3_client))) + results.append(("s3fs Directory Detection", test_s3fs_directory_detection(fs))) + results.append(("s3fs.isdir() Method", test_s3fs_isdir(fs))) + results.append(("PyArrow Dataset Read", test_pyarrow_dataset_read(fs))) + results.append(("Explicit Directory Marker", test_explicit_directory_marker(fs, s3_client))) + results.append(("Empty File Not Directory", test_empty_file_not_directory(fs, s3_client))) + + # Print summary + logger.info("\n" + "="*80) + logger.info("TEST SUMMARY") + logger.info("="*80) + + passed = sum(1 for _, result in results if result) + total = len(results) + + for name, result in results: + status = "ā PASS" if result else "ā FAIL" + logger.info(f"{status}: {name}") + + logger.info("="*80) + logger.info(f"Results: {passed}/{total} tests passed") + logger.info("="*80) + + if passed == total: + logger.info("\nš All tests passed! The implicit directory fix is working correctly.") + return 0 + else: + logger.warning(f"\nā ļø {total - passed} test(s) failed. The fix may not be fully working.") + return 1 + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/test/s3/sse/s3_range_headers_test.go b/test/s3/sse/s3_range_headers_test.go new file mode 100644 index 000000000..e54004eb7 --- /dev/null +++ b/test/s3/sse/s3_range_headers_test.go @@ -0,0 +1,104 @@ +package sse_test + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestPlainObjectRangeAndHeadHeaders ensures non-SSE objects advertise correct +// Content-Length and Content-Range information for both HEAD and ranged GETs. +func TestPlainObjectRangeAndHeadHeaders(t *testing.T) { + ctx := context.Background() + + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"range-plain-") + require.NoError(t, err, "failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + // SeaweedFS S3 auto-chunks uploads at 8MiB (see chunkSize in putToFiler). + // Using 16MiB ensures at least two chunks without stressing CI resources. + const chunkSize = 8 * 1024 * 1024 + const objectSize = 2 * chunkSize + objectKey := "plain-range-validation" + testData := generateTestData(objectSize) + + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + }) + require.NoError(t, err, "failed to upload test object") + + t.Run("HeadObject reports accurate Content-Length", func(t *testing.T) { + resp, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "HeadObject request failed") + assert.Equal(t, int64(objectSize), resp.ContentLength, "Content-Length mismatch on HEAD") + assert.Equal(t, "bytes", aws.ToString(resp.AcceptRanges), "Accept-Ranges should advertise bytes") + }) + + t.Run("Range request across chunk boundary", func(t *testing.T) { + // Test range that spans an 8MiB chunk boundary (chunkSize - 1KB to chunkSize + 3KB) + rangeStart := int64(chunkSize - 1024) + rangeEnd := rangeStart + 4096 - 1 + rangeHeader := fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd) + + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Range: aws.String(rangeHeader), + }) + require.NoError(t, err, "GetObject range request failed") + defer resp.Body.Close() + + expectedLen := rangeEnd - rangeStart + 1 + assert.Equal(t, expectedLen, resp.ContentLength, "Content-Length must match requested range size") + assert.Equal(t, + fmt.Sprintf("bytes %d-%d/%d", rangeStart, rangeEnd, objectSize), + aws.ToString(resp.ContentRange), + "Content-Range header mismatch") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err, "failed to read range response body") + assert.Equal(t, int(expectedLen), len(body), "actual bytes read mismatch") + assert.Equal(t, testData[rangeStart:rangeEnd+1], body, "range payload mismatch") + }) + + t.Run("Suffix range request", func(t *testing.T) { + const suffixSize = 2048 + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Range: aws.String(fmt.Sprintf("bytes=-%d", suffixSize)), + }) + require.NoError(t, err, "GetObject suffix range request failed") + defer resp.Body.Close() + + expectedStart := int64(objectSize - suffixSize) + expectedEnd := int64(objectSize - 1) + expectedLen := expectedEnd - expectedStart + 1 + + assert.Equal(t, expectedLen, resp.ContentLength, "suffix Content-Length mismatch") + assert.Equal(t, + fmt.Sprintf("bytes %d-%d/%d", expectedStart, expectedEnd, objectSize), + aws.ToString(resp.ContentRange), + "suffix Content-Range mismatch") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err, "failed to read suffix range response body") + assert.Equal(t, int(expectedLen), len(body), "suffix range byte count mismatch") + assert.Equal(t, testData[expectedStart:expectedEnd+1], body, "suffix range payload mismatch") + }) +} diff --git a/test/s3/sse/s3_sse_range_server_test.go b/test/s3/sse/s3_sse_range_server_test.go new file mode 100644 index 000000000..0b02ec62b --- /dev/null +++ b/test/s3/sse/s3_sse_range_server_test.go @@ -0,0 +1,445 @@ +package sse_test + +import ( + "bytes" + "context" + "crypto/sha256" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// signRawHTTPRequest signs a raw HTTP request with AWS Signature V4 +func signRawHTTPRequest(ctx context.Context, req *http.Request, cfg *S3SSETestConfig) error { + // Create credentials + creds := aws.Credentials{ + AccessKeyID: cfg.AccessKey, + SecretAccessKey: cfg.SecretKey, + } + + // Create signer + signer := v4.NewSigner() + + // Calculate payload hash (empty for GET requests) + payloadHash := fmt.Sprintf("%x", sha256.Sum256([]byte{})) + + // Sign the request + err := signer.SignHTTP(ctx, creds, req, payloadHash, "s3", cfg.Region, time.Now()) + if err != nil { + return fmt.Errorf("failed to sign request: %w", err) + } + + return nil +} + +// TestSSECRangeRequestsServerBehavior tests that the server correctly handles Range requests +// for SSE-C encrypted objects by checking actual HTTP response (not SDK-processed response) +func TestSSECRangeRequestsServerBehavior(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"ssec-range-server-") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + sseKey := generateSSECKey() + testData := generateTestData(2048) // 2KB test file + objectKey := "test-range-server-validation" + + // Upload with SSE-C + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(sseKey.KeyB64), + SSECustomerKeyMD5: aws.String(sseKey.KeyMD5), + }) + require.NoError(t, err, "Failed to upload SSE-C object") + + // Test cases for range requests + testCases := []struct { + name string + rangeHeader string + expectedStart int64 + expectedEnd int64 + expectedTotal int64 + }{ + { + name: "First 100 bytes", + rangeHeader: "bytes=0-99", + expectedStart: 0, + expectedEnd: 99, + expectedTotal: 2048, + }, + { + name: "Middle range", + rangeHeader: "bytes=500-699", + expectedStart: 500, + expectedEnd: 699, + expectedTotal: 2048, + }, + { + name: "Last 100 bytes", + rangeHeader: "bytes=1948-2047", + expectedStart: 1948, + expectedEnd: 2047, + expectedTotal: 2048, + }, + { + name: "Single byte", + rangeHeader: "bytes=1000-1000", + expectedStart: 1000, + expectedEnd: 1000, + expectedTotal: 2048, + }, + { + name: "AES block boundary crossing", + rangeHeader: "bytes=15-17", + expectedStart: 15, + expectedEnd: 17, + expectedTotal: 2048, + }, + { + name: "Open-ended range", + rangeHeader: "bytes=2000-", + expectedStart: 2000, + expectedEnd: 2047, + expectedTotal: 2048, + }, + { + name: "Suffix range (last 100 bytes)", + rangeHeader: "bytes=-100", + expectedStart: 1948, + expectedEnd: 2047, + expectedTotal: 2048, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Build object URL (Endpoint already includes http://) + objectURL := fmt.Sprintf("%s/%s/%s", + defaultConfig.Endpoint, + bucketName, + objectKey, + ) + + // Create raw HTTP request + req, err := http.NewRequest("GET", objectURL, nil) + require.NoError(t, err, "Failed to create HTTP request") + + // Add Range header + req.Header.Set("Range", tc.rangeHeader) + + // Add SSE-C headers + req.Header.Set("x-amz-server-side-encryption-customer-algorithm", "AES256") + req.Header.Set("x-amz-server-side-encryption-customer-key", sseKey.KeyB64) + req.Header.Set("x-amz-server-side-encryption-customer-key-MD5", sseKey.KeyMD5) + + // Sign the request with AWS Signature V4 + err = signRawHTTPRequest(ctx, req, defaultConfig) + require.NoError(t, err, "Failed to sign HTTP request") + + // Make request with raw HTTP client + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + require.NoError(t, err, "Failed to execute range request") + defer resp.Body.Close() + + // CRITICAL CHECK 1: Status code must be 206 Partial Content + assert.Equal(t, http.StatusPartialContent, resp.StatusCode, + "Server must return 206 Partial Content for range request, got %d", resp.StatusCode) + + // CRITICAL CHECK 2: Content-Range header must be present and correct + expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", + tc.expectedStart, tc.expectedEnd, tc.expectedTotal) + actualContentRange := resp.Header.Get("Content-Range") + assert.Equal(t, expectedContentRange, actualContentRange, + "Content-Range header mismatch") + + // CRITICAL CHECK 3: Content-Length must match requested range size + expectedLength := tc.expectedEnd - tc.expectedStart + 1 + actualLength := resp.ContentLength + assert.Equal(t, expectedLength, actualLength, + "Content-Length mismatch: expected %d, got %d", expectedLength, actualLength) + + // CRITICAL CHECK 4: Actual bytes received from network + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err, "Failed to read response body") + assert.Equal(t, int(expectedLength), len(bodyBytes), + "Actual bytes received from server mismatch: expected %d, got %d", + expectedLength, len(bodyBytes)) + + // CRITICAL CHECK 5: Verify decrypted content matches expected range + expectedData := testData[tc.expectedStart : tc.expectedEnd+1] + assert.Equal(t, expectedData, bodyBytes, + "Decrypted range content doesn't match expected data") + + // Verify SSE-C headers are present in response + assert.Equal(t, "AES256", resp.Header.Get("x-amz-server-side-encryption-customer-algorithm"), + "SSE-C algorithm header missing in range response") + assert.Equal(t, sseKey.KeyMD5, resp.Header.Get("x-amz-server-side-encryption-customer-key-MD5"), + "SSE-C key MD5 header missing in range response") + }) + } +} + +// TestSSEKMSRangeRequestsServerBehavior tests server-side Range handling for SSE-KMS +func TestSSEKMSRangeRequestsServerBehavior(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"ssekms-range-server-") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + kmsKeyID := "test-range-key" + testData := generateTestData(4096) // 4KB test file + objectKey := "test-kms-range-server-validation" + + // Upload with SSE-KMS + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + ServerSideEncryption: "aws:kms", + SSEKMSKeyId: aws.String(kmsKeyID), + }) + require.NoError(t, err, "Failed to upload SSE-KMS object") + + // Test various ranges + testCases := []struct { + name string + rangeHeader string + start int64 + end int64 + }{ + {"First KB", "bytes=0-1023", 0, 1023}, + {"Second KB", "bytes=1024-2047", 1024, 2047}, + {"Last KB", "bytes=3072-4095", 3072, 4095}, + {"Unaligned range", "bytes=100-299", 100, 299}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + objectURL := fmt.Sprintf("%s/%s/%s", + defaultConfig.Endpoint, + bucketName, + objectKey, + ) + + req, err := http.NewRequest("GET", objectURL, nil) + require.NoError(t, err) + req.Header.Set("Range", tc.rangeHeader) + + // Sign the request with AWS Signature V4 + err = signRawHTTPRequest(ctx, req, defaultConfig) + require.NoError(t, err, "Failed to sign HTTP request") + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + // Verify 206 status + assert.Equal(t, http.StatusPartialContent, resp.StatusCode, + "SSE-KMS range request must return 206, got %d", resp.StatusCode) + + // Verify Content-Range + expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", tc.start, tc.end, int64(len(testData))) + assert.Equal(t, expectedContentRange, resp.Header.Get("Content-Range")) + + // Verify actual bytes received + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + expectedLength := tc.end - tc.start + 1 + assert.Equal(t, int(expectedLength), len(bodyBytes), + "Actual network bytes mismatch") + + // Verify content + expectedData := testData[tc.start : tc.end+1] + assert.Equal(t, expectedData, bodyBytes) + }) + } +} + +// TestSSES3RangeRequestsServerBehavior tests server-side Range handling for SSE-S3 +func TestSSES3RangeRequestsServerBehavior(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, "sses3-range-server") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + testData := generateTestData(8192) // 8KB test file + objectKey := "test-s3-range-server-validation" + + // Upload with SSE-S3 + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + ServerSideEncryption: "AES256", + }) + require.NoError(t, err, "Failed to upload SSE-S3 object") + + // Test range request + objectURL := fmt.Sprintf("%s/%s/%s", + defaultConfig.Endpoint, + bucketName, + objectKey, + ) + + req, err := http.NewRequest("GET", objectURL, nil) + require.NoError(t, err) + req.Header.Set("Range", "bytes=1000-1999") + + // Sign the request with AWS Signature V4 + err = signRawHTTPRequest(ctx, req, defaultConfig) + require.NoError(t, err, "Failed to sign HTTP request") + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + // Verify server response + assert.Equal(t, http.StatusPartialContent, resp.StatusCode) + assert.Equal(t, "bytes 1000-1999/8192", resp.Header.Get("Content-Range")) + assert.Equal(t, int64(1000), resp.ContentLength) + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, 1000, len(bodyBytes)) + assert.Equal(t, testData[1000:2000], bodyBytes) +} + +// TestSSEMultipartRangeRequestsServerBehavior tests Range requests on multipart encrypted objects +func TestSSEMultipartRangeRequestsServerBehavior(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err) + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"ssec-mp-range-") + require.NoError(t, err) + defer cleanupTestBucket(ctx, client, bucketName) + + sseKey := generateSSECKey() + objectKey := "test-multipart-range-server" + + // Create 10MB test data (2 parts of 5MB each) + partSize := 5 * 1024 * 1024 + part1Data := generateTestData(partSize) + part2Data := generateTestData(partSize) + fullData := append(part1Data, part2Data...) + + // Initiate multipart upload + createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(sseKey.KeyB64), + SSECustomerKeyMD5: aws.String(sseKey.KeyMD5), + }) + require.NoError(t, err) + uploadID := aws.ToString(createResp.UploadId) + + // Upload part 1 + part1Resp, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadID), + PartNumber: aws.Int32(1), + Body: bytes.NewReader(part1Data), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(sseKey.KeyB64), + SSECustomerKeyMD5: aws.String(sseKey.KeyMD5), + }) + require.NoError(t, err) + + // Upload part 2 + part2Resp, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadID), + PartNumber: aws.Int32(2), + Body: bytes.NewReader(part2Data), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(sseKey.KeyB64), + SSECustomerKeyMD5: aws.String(sseKey.KeyMD5), + }) + require.NoError(t, err) + + // Complete multipart upload + _, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadID), + MultipartUpload: &s3types.CompletedMultipartUpload{ + Parts: []s3types.CompletedPart{ + {PartNumber: aws.Int32(1), ETag: part1Resp.ETag}, + {PartNumber: aws.Int32(2), ETag: part2Resp.ETag}, + }, + }, + }) + require.NoError(t, err) + + // Test range that crosses part boundary + objectURL := fmt.Sprintf("%s/%s/%s", + defaultConfig.Endpoint, + bucketName, + objectKey, + ) + + // Range spanning across the part boundary + start := int64(partSize - 1000) + end := int64(partSize + 1000) + + req, err := http.NewRequest("GET", objectURL, nil) + require.NoError(t, err) + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end)) + req.Header.Set("x-amz-server-side-encryption-customer-algorithm", "AES256") + req.Header.Set("x-amz-server-side-encryption-customer-key", sseKey.KeyB64) + req.Header.Set("x-amz-server-side-encryption-customer-key-MD5", sseKey.KeyMD5) + + // Sign the request with AWS Signature V4 + err = signRawHTTPRequest(ctx, req, defaultConfig) + require.NoError(t, err, "Failed to sign HTTP request") + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + // Verify server behavior for cross-part range + assert.Equal(t, http.StatusPartialContent, resp.StatusCode, + "Multipart range request must return 206") + + expectedLength := end - start + 1 + assert.Equal(t, expectedLength, resp.ContentLength, + "Content-Length for cross-part range") + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, int(expectedLength), len(bodyBytes), + "Actual bytes for cross-part range") + + // Verify content spans the part boundary correctly + expectedData := fullData[start : end+1] + assert.Equal(t, expectedData, bodyBytes, + "Cross-part range content must be correctly decrypted and assembled") +} |
