diff options
Diffstat (limited to 'test/kafka/kafka-client-loadtest')
43 files changed, 7436 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/.dockerignore b/test/kafka/kafka-client-loadtest/.dockerignore new file mode 100644 index 000000000..1354ab263 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/.dockerignore @@ -0,0 +1,3 @@ +# Keep only the Linux binaries +!weed-linux-amd64 +!weed-linux-arm64 diff --git a/test/kafka/kafka-client-loadtest/.gitignore b/test/kafka/kafka-client-loadtest/.gitignore new file mode 100644 index 000000000..ef136a5e2 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/.gitignore @@ -0,0 +1,63 @@ +# Binaries +kafka-loadtest +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out + +# Go workspace file +go.work + +# Test results and logs +test-results/ +*.log +logs/ + +# Docker volumes and data +data/ +volumes/ + +# Monitoring data +monitoring/prometheus/data/ +monitoring/grafana/data/ + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Environment files +.env +.env.local +.env.*.local + +# Temporary files +tmp/ +temp/ +*.tmp + +# Coverage reports +coverage.html +coverage.out + +# Build artifacts +bin/ +build/ +dist/ diff --git a/test/kafka/kafka-client-loadtest/Dockerfile.loadtest b/test/kafka/kafka-client-loadtest/Dockerfile.loadtest new file mode 100644 index 000000000..ccf7e5e16 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/Dockerfile.loadtest @@ -0,0 +1,49 @@ +# Kafka Client Load Test Runner Dockerfile +# Multi-stage build for cross-platform support + +# Stage 1: Builder +FROM golang:1.24-alpine AS builder + +WORKDIR /app + +# Copy go module files +COPY test/kafka/kafka-client-loadtest/go.mod test/kafka/kafka-client-loadtest/go.sum ./ +RUN go mod download + +# Copy source code +COPY test/kafka/kafka-client-loadtest/ ./ + +# Build the loadtest binary +RUN CGO_ENABLED=0 GOOS=linux go build -o /kafka-loadtest ./cmd/loadtest + +# Stage 2: Runtime +FROM ubuntu:22.04 + +# Install runtime dependencies +RUN apt-get update && apt-get install -y \ + ca-certificates \ + curl \ + jq \ + bash \ + netcat \ + && rm -rf /var/lib/apt/lists/* + +# Copy built binary from builder stage +COPY --from=builder /kafka-loadtest /usr/local/bin/kafka-loadtest +RUN chmod +x /usr/local/bin/kafka-loadtest + +# Copy scripts and configuration +COPY test/kafka/kafka-client-loadtest/scripts/ /scripts/ +COPY test/kafka/kafka-client-loadtest/config/ /config/ + +# Create results directory +RUN mkdir -p /test-results + +# Make scripts executable +RUN chmod +x /scripts/*.sh + +WORKDIR /app + +# Default command runs the comprehensive load test +CMD ["/usr/local/bin/kafka-loadtest", "-config", "/config/loadtest.yaml"] + diff --git a/test/kafka/kafka-client-loadtest/Dockerfile.seaweedfs b/test/kafka/kafka-client-loadtest/Dockerfile.seaweedfs new file mode 100644 index 000000000..cde2e3df1 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/Dockerfile.seaweedfs @@ -0,0 +1,37 @@ +# SeaweedFS Runtime Dockerfile for Kafka Client Load Tests +# Optimized for fast builds - binary built locally and copied in +FROM alpine:3.18 + +# Install runtime dependencies +RUN apk add --no-cache \ + ca-certificates \ + wget \ + netcat-openbsd \ + curl \ + tzdata \ + && rm -rf /var/cache/apk/* + +# Copy pre-built SeaweedFS binary (built locally for linux/amd64 or linux/arm64) +# Cache-busting: Use build arg to force layer rebuild on every build +ARG TARGETARCH=arm64 +ARG CACHE_BUST=unknown +RUN echo "Building with cache bust: ${CACHE_BUST}" +COPY weed-linux-${TARGETARCH} /usr/local/bin/weed +RUN chmod +x /usr/local/bin/weed + +# Create data directory +RUN mkdir -p /data + +# Set timezone +ENV TZ=UTC + +# Health check script +RUN echo '#!/bin/sh' > /usr/local/bin/health-check && \ + echo 'exec "$@"' >> /usr/local/bin/health-check && \ + chmod +x /usr/local/bin/health-check + +VOLUME ["/data"] +WORKDIR /data + +ENTRYPOINT ["/usr/local/bin/weed"] + diff --git a/test/kafka/kafka-client-loadtest/Makefile b/test/kafka/kafka-client-loadtest/Makefile new file mode 100644 index 000000000..362b5c680 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/Makefile @@ -0,0 +1,446 @@ +# Kafka Client Load Test Makefile +# Provides convenient targets for running load tests against SeaweedFS Kafka Gateway + +.PHONY: help build start stop restart clean test quick-test stress-test endurance-test monitor logs status + +# Configuration +DOCKER_COMPOSE := docker compose +PROJECT_NAME := kafka-client-loadtest +CONFIG_FILE := config/loadtest.yaml + +# Build configuration +GOARCH ?= arm64 +GOOS ?= linux + +# Default test parameters +TEST_MODE ?= comprehensive +TEST_DURATION ?= 300s +PRODUCER_COUNT ?= 10 +CONSUMER_COUNT ?= 5 +MESSAGE_RATE ?= 1000 +MESSAGE_SIZE ?= 1024 + +# Colors for output +GREEN := \033[0;32m +YELLOW := \033[0;33m +BLUE := \033[0;34m +NC := \033[0m + +help: ## Show this help message + @echo "Kafka Client Load Test Makefile" + @echo "" + @echo "Available targets:" + @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf " $(BLUE)%-20s$(NC) %s\n", $$1, $$2}' $(MAKEFILE_LIST) + @echo "" + @echo "Environment variables:" + @echo " TEST_MODE Test mode: producer, consumer, comprehensive (default: comprehensive)" + @echo " TEST_DURATION Test duration (default: 300s)" + @echo " PRODUCER_COUNT Number of producers (default: 10)" + @echo " CONSUMER_COUNT Number of consumers (default: 5)" + @echo " MESSAGE_RATE Messages per second per producer (default: 1000)" + @echo " MESSAGE_SIZE Message size in bytes (default: 1024)" + @echo "" + @echo "Examples:" + @echo " make test # Run default comprehensive test" + @echo " make test TEST_DURATION=10m # Run 10-minute test" + @echo " make quick-test # Run quick smoke test (rebuilds gateway)" + @echo " make stress-test # Run high-load stress test" + @echo " make test TEST_MODE=producer # Producer-only test" + @echo " make schema-test # Run schema integration test with Schema Registry" + @echo " make schema-quick-test # Run quick schema test (30s timeout)" + @echo " make schema-loadtest # Run load test with schemas enabled" + @echo " make build-binary # Build SeaweedFS binary locally for Linux" + @echo " make build-gateway # Build Kafka Gateway (builds binary + Docker image)" + @echo " make build-gateway-clean # Build Kafka Gateway with no cache (fresh build)" + +build: ## Build the load test application + @echo "$(BLUE)Building load test application...$(NC)" + $(DOCKER_COMPOSE) build kafka-client-loadtest + @echo "$(GREEN)Build completed$(NC)" + +build-binary: ## Build the SeaweedFS binary locally for Linux + @echo "$(BLUE)Building SeaweedFS binary locally for $(GOOS) $(GOARCH)...$(NC)" + cd ../../.. && \ + CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ + -ldflags="-s -w" \ + -tags "5BytesOffset" \ + -o test/kafka/kafka-client-loadtest/weed-$(GOOS)-$(GOARCH) \ + weed/weed.go + @echo "$(GREEN)Binary build completed: weed-$(GOOS)-$(GOARCH)$(NC)" + +build-gateway: build-binary ## Build the Kafka Gateway with latest changes + @echo "$(BLUE)Building Kafka Gateway Docker image...$(NC)" + CACHE_BUST=$$(date +%s) $(DOCKER_COMPOSE) build kafka-gateway + @echo "$(GREEN)Kafka Gateway build completed$(NC)" + +build-gateway-clean: build-binary ## Build the Kafka Gateway with no cache (force fresh build) + @echo "$(BLUE)Building Kafka Gateway Docker image with no cache...$(NC)" + $(DOCKER_COMPOSE) build --no-cache kafka-gateway + @echo "$(GREEN)Kafka Gateway clean build completed$(NC)" + +setup: ## Set up monitoring and configuration + @echo "$(BLUE)Setting up monitoring configuration...$(NC)" + ./scripts/setup-monitoring.sh + @echo "$(GREEN)Setup completed$(NC)" + +start: build-gateway ## Start the infrastructure services (without load test) + @echo "$(BLUE)Starting SeaweedFS infrastructure...$(NC)" + $(DOCKER_COMPOSE) up -d \ + seaweedfs-master \ + seaweedfs-volume \ + seaweedfs-filer \ + seaweedfs-mq-broker \ + kafka-gateway \ + schema-registry-init \ + schema-registry + @echo "$(GREEN)Infrastructure started$(NC)" + @echo "Waiting for services to be ready..." + ./scripts/wait-for-services.sh wait + @echo "$(GREEN)All services are ready!$(NC)" + +stop: ## Stop all services + @echo "$(BLUE)Stopping all services...$(NC)" + $(DOCKER_COMPOSE) --profile loadtest --profile monitoring down + @echo "$(GREEN)Services stopped$(NC)" + +restart: stop start ## Restart all services + +clean: ## Clean up all resources (containers, volumes, networks, local data) + @echo "$(YELLOW)Warning: This will remove all volumes and data!$(NC)" + @echo "Press Ctrl+C to cancel, or wait 5 seconds to continue..." + @sleep 5 + @echo "$(BLUE)Cleaning up all resources...$(NC)" + $(DOCKER_COMPOSE) --profile loadtest --profile monitoring down -v --remove-orphans + docker system prune -f + @if [ -f "weed-linux-arm64" ]; then \ + echo "$(BLUE)Removing local binary...$(NC)"; \ + rm -f weed-linux-arm64; \ + fi + @if [ -d "data" ]; then \ + echo "$(BLUE)Removing ALL local data directories (including offset state)...$(NC)"; \ + rm -rf data/*; \ + fi + @echo "$(GREEN)Cleanup completed - all data removed$(NC)" + +clean-binary: ## Clean up only the local binary + @echo "$(BLUE)Removing local binary...$(NC)" + @rm -f weed-linux-arm64 + @echo "$(GREEN)Binary cleanup completed$(NC)" + +status: ## Show service status + @echo "$(BLUE)Service Status:$(NC)" + $(DOCKER_COMPOSE) ps + +logs: ## Show logs from all services + $(DOCKER_COMPOSE) logs -f + +test: start ## Run the comprehensive load test + @echo "$(BLUE)Running Kafka client load test...$(NC)" + @echo "Mode: $(TEST_MODE), Duration: $(TEST_DURATION)" + @echo "Producers: $(PRODUCER_COUNT), Consumers: $(CONSUMER_COUNT)" + @echo "Message Rate: $(MESSAGE_RATE) msgs/sec, Size: $(MESSAGE_SIZE) bytes" + @echo "" + @docker rm -f kafka-client-loadtest-runner 2>/dev/null || true + TEST_MODE=$(TEST_MODE) TEST_DURATION=$(TEST_DURATION) PRODUCER_COUNT=$(PRODUCER_COUNT) CONSUMER_COUNT=$(CONSUMER_COUNT) MESSAGE_RATE=$(MESSAGE_RATE) MESSAGE_SIZE=$(MESSAGE_SIZE) VALUE_TYPE=$(VALUE_TYPE) $(DOCKER_COMPOSE) --profile loadtest up --abort-on-container-exit kafka-client-loadtest + @echo "$(GREEN)Load test completed!$(NC)" + @$(MAKE) show-results + +quick-test: build-gateway ## Run a quick smoke test (1 min, low load, WITH schemas) + @echo "$(BLUE)================================================================$(NC)" + @echo "$(BLUE) Quick Test (Low Load, WITH Schema Registry + Avro) $(NC)" + @echo "$(BLUE) - Duration: 1 minute $(NC)" + @echo "$(BLUE) - Load: 1 producer × 10 msg/sec = 10 total msg/sec $(NC)" + @echo "$(BLUE) - Message Type: Avro (with schema encoding) $(NC)" + @echo "$(BLUE) - Schema-First: Registers schemas BEFORE producing $(NC)" + @echo "$(BLUE)================================================================$(NC)" + @echo "" + @$(MAKE) start + @echo "" + @echo "$(BLUE)=== Step 1: Registering schemas in Schema Registry ===$(NC)" + @echo "$(YELLOW)[WARN] IMPORTANT: Schemas MUST be registered before producing Avro messages!$(NC)" + @./scripts/register-schemas.sh full + @echo "$(GREEN)- Schemas registered successfully$(NC)" + @echo "" + @echo "$(BLUE)=== Step 2: Running load test with Avro messages ===$(NC)" + @$(MAKE) test \ + TEST_MODE=comprehensive \ + TEST_DURATION=60s \ + PRODUCER_COUNT=1 \ + CONSUMER_COUNT=1 \ + MESSAGE_RATE=10 \ + MESSAGE_SIZE=256 \ + VALUE_TYPE=avro + @echo "" + @echo "$(GREEN)================================================================$(NC)" + @echo "$(GREEN) Quick Test Complete! $(NC)" + @echo "$(GREEN) - Schema Registration $(NC)" + @echo "$(GREEN) - Avro Message Production $(NC)" + @echo "$(GREEN) - Message Consumption $(NC)" + @echo "$(GREEN)================================================================$(NC)" + +standard-test: ## Run a standard load test (2 min, medium load, WITH Schema Registry + Avro) + @echo "$(BLUE)================================================================$(NC)" + @echo "$(BLUE) Standard Test (Medium Load, WITH Schema Registry) $(NC)" + @echo "$(BLUE) - Duration: 2 minutes $(NC)" + @echo "$(BLUE) - Load: 2 producers × 50 msg/sec = 100 total msg/sec $(NC)" + @echo "$(BLUE) - Message Type: Avro (with schema encoding) $(NC)" + @echo "$(BLUE) - IMPORTANT: Schemas registered FIRST in Schema Registry $(NC)" + @echo "$(BLUE)================================================================$(NC)" + @echo "" + @$(MAKE) start + @echo "" + @echo "$(BLUE)=== Step 1: Registering schemas in Schema Registry ===$(NC)" + @echo "$(YELLOW)Note: Schemas MUST be registered before producing Avro messages!$(NC)" + @./scripts/register-schemas.sh full + @echo "$(GREEN)- Schemas registered$(NC)" + @echo "" + @echo "$(BLUE)=== Step 2: Running load test with Avro messages ===$(NC)" + @$(MAKE) test \ + TEST_MODE=comprehensive \ + TEST_DURATION=2m \ + PRODUCER_COUNT=2 \ + CONSUMER_COUNT=2 \ + MESSAGE_RATE=50 \ + MESSAGE_SIZE=512 \ + VALUE_TYPE=avro + @echo "" + @echo "$(GREEN)================================================================$(NC)" + @echo "$(GREEN) Standard Test Complete! $(NC)" + @echo "$(GREEN)================================================================$(NC)" + +stress-test: ## Run a stress test (10 minutes, high load) with schemas + @echo "$(BLUE)Starting stress test with schema registration...$(NC)" + @$(MAKE) start + @echo "$(BLUE)Registering schemas with Schema Registry...$(NC)" + @./scripts/register-schemas.sh full + @echo "$(BLUE)Running stress test with registered schemas...$(NC)" + @$(MAKE) test \ + TEST_MODE=comprehensive \ + TEST_DURATION=10m \ + PRODUCER_COUNT=20 \ + CONSUMER_COUNT=10 \ + MESSAGE_RATE=2000 \ + MESSAGE_SIZE=2048 \ + VALUE_TYPE=avro + +endurance-test: ## Run an endurance test (30 minutes, sustained load) with schemas + @echo "$(BLUE)Starting endurance test with schema registration...$(NC)" + @$(MAKE) start + @echo "$(BLUE)Registering schemas with Schema Registry...$(NC)" + @./scripts/register-schemas.sh full + @echo "$(BLUE)Running endurance test with registered schemas...$(NC)" + @$(MAKE) test \ + TEST_MODE=comprehensive \ + TEST_DURATION=30m \ + PRODUCER_COUNT=10 \ + CONSUMER_COUNT=5 \ + MESSAGE_RATE=1000 \ + MESSAGE_SIZE=1024 \ + VALUE_TYPE=avro + +producer-test: ## Run producer-only load test + @$(MAKE) test TEST_MODE=producer + +consumer-test: ## Run consumer-only load test (requires existing messages) + @$(MAKE) test TEST_MODE=consumer + +register-schemas: start ## Register schemas with Schema Registry + @echo "$(BLUE)Registering schemas with Schema Registry...$(NC)" + @./scripts/register-schemas.sh full + @echo "$(GREEN)Schema registration completed!$(NC)" + +verify-schemas: ## Verify schemas are registered in Schema Registry + @echo "$(BLUE)Verifying schemas in Schema Registry...$(NC)" + @./scripts/register-schemas.sh verify + @echo "$(GREEN)Schema verification completed!$(NC)" + +list-schemas: ## List all registered schemas in Schema Registry + @echo "$(BLUE)Listing registered schemas...$(NC)" + @./scripts/register-schemas.sh list + +cleanup-schemas: ## Clean up test schemas from Schema Registry + @echo "$(YELLOW)Cleaning up test schemas...$(NC)" + @./scripts/register-schemas.sh cleanup + @echo "$(GREEN)Schema cleanup completed!$(NC)" + +schema-test: start ## Run schema integration test (with Schema Registry) + @echo "$(BLUE)Running schema integration test...$(NC)" + @echo "Testing Schema Registry integration with schematized topics" + @echo "" + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o schema-test-linux test_schema_integration.go + docker run --rm --network kafka-client-loadtest \ + -v $(PWD)/schema-test-linux:/usr/local/bin/schema-test \ + alpine:3.18 /usr/local/bin/schema-test + @rm -f schema-test-linux + @echo "$(GREEN)Schema integration test completed!$(NC)" + +schema-quick-test: start ## Run quick schema test (lighter version) + @echo "$(BLUE)Running quick schema test...$(NC)" + @echo "Testing basic schema functionality" + @echo "" + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o schema-test-linux test_schema_integration.go + timeout 60s docker run --rm --network kafka-client-loadtest \ + -v $(PWD)/schema-test-linux:/usr/local/bin/schema-test \ + alpine:3.18 /usr/local/bin/schema-test || true + @rm -f schema-test-linux + @echo "$(GREEN)Quick schema test completed!$(NC)" + +simple-schema-test: start ## Run simple schema test (step-by-step) + @echo "$(BLUE)Running simple schema test...$(NC)" + @echo "Step-by-step schema functionality test" + @echo "" + @mkdir -p simple-test + @cp simple_schema_test.go simple-test/main.go + cd simple-test && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ../simple-schema-test-linux . + docker run --rm --network kafka-client-loadtest \ + -v $(PWD)/simple-schema-test-linux:/usr/local/bin/simple-schema-test \ + alpine:3.18 /usr/local/bin/simple-schema-test + @rm -f simple-schema-test-linux + @rm -rf simple-test + @echo "$(GREEN)Simple schema test completed!$(NC)" + +basic-schema-test: start ## Run basic schema test (manual schema handling without Schema Registry) + @echo "$(BLUE)Running basic schema test...$(NC)" + @echo "Testing schema functionality without Schema Registry dependency" + @echo "" + @mkdir -p basic-test + @cp basic_schema_test.go basic-test/main.go + cd basic-test && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ../basic-schema-test-linux . + timeout 60s docker run --rm --network kafka-client-loadtest \ + -v $(PWD)/basic-schema-test-linux:/usr/local/bin/basic-schema-test \ + alpine:3.18 /usr/local/bin/basic-schema-test + @rm -f basic-schema-test-linux + @rm -rf basic-test + @echo "$(GREEN)Basic schema test completed!$(NC)" + +schema-loadtest: start ## Run load test with schemas enabled + @echo "$(BLUE)Running schema-enabled load test...$(NC)" + @echo "Mode: comprehensive with schemas, Duration: 3m" + @echo "Producers: 3, Consumers: 2, Message Rate: 50 msgs/sec" + @echo "" + TEST_MODE=comprehensive \ + TEST_DURATION=3m \ + PRODUCER_COUNT=3 \ + CONSUMER_COUNT=2 \ + MESSAGE_RATE=50 \ + MESSAGE_SIZE=1024 \ + SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ + $(DOCKER_COMPOSE) --profile loadtest up --abort-on-container-exit kafka-client-loadtest + @echo "$(GREEN)Schema load test completed!$(NC)" + @$(MAKE) show-results + +monitor: setup ## Start monitoring stack (Prometheus + Grafana) + @echo "$(BLUE)Starting monitoring stack...$(NC)" + $(DOCKER_COMPOSE) --profile monitoring up -d prometheus grafana + @echo "$(GREEN)Monitoring stack started!$(NC)" + @echo "" + @echo "Access points:" + @echo " Prometheus: http://localhost:9090" + @echo " Grafana: http://localhost:3000 (admin/admin)" + +monitor-stop: ## Stop monitoring stack + @echo "$(BLUE)Stopping monitoring stack...$(NC)" + $(DOCKER_COMPOSE) --profile monitoring stop prometheus grafana + @echo "$(GREEN)Monitoring stack stopped$(NC)" + +test-with-monitoring: monitor start ## Run test with monitoring enabled + @echo "$(BLUE)Running load test with monitoring...$(NC)" + @$(MAKE) test + @echo "" + @echo "$(GREEN)Test completed! Check the monitoring dashboards:$(NC)" + @echo " Prometheus: http://localhost:9090" + @echo " Grafana: http://localhost:3000 (admin/admin)" + +show-results: ## Show test results + @echo "$(BLUE)Test Results Summary:$(NC)" + @if $(DOCKER_COMPOSE) ps -q kafka-client-loadtest-runner >/dev/null 2>&1; then \ + $(DOCKER_COMPOSE) exec -T kafka-client-loadtest-runner curl -s http://localhost:8080/stats 2>/dev/null || echo "Results not available"; \ + else \ + echo "Load test container not running"; \ + fi + @echo "" + @if [ -d "test-results" ]; then \ + echo "Detailed results saved to: test-results/"; \ + ls -la test-results/ 2>/dev/null || true; \ + fi + +health-check: ## Check health of all services + @echo "$(BLUE)Checking service health...$(NC)" + ./scripts/wait-for-services.sh check + +validate-setup: ## Validate the test setup + @echo "$(BLUE)Validating test setup...$(NC)" + @echo "Checking Docker and Docker Compose..." + @docker --version + @docker compose version || docker-compose --version + @echo "" + @echo "Checking configuration file..." + @if [ -f "$(CONFIG_FILE)" ]; then \ + echo "- Configuration file exists: $(CONFIG_FILE)"; \ + else \ + echo "x Configuration file not found: $(CONFIG_FILE)"; \ + exit 1; \ + fi + @echo "" + @echo "Checking scripts..." + @for script in scripts/*.sh; do \ + if [ -x "$$script" ]; then \ + echo "- $$script is executable"; \ + else \ + echo "x $$script is not executable"; \ + fi; \ + done + @echo "$(GREEN)Setup validation completed$(NC)" + +dev-env: ## Set up development environment + @echo "$(BLUE)Setting up development environment...$(NC)" + @echo "Installing Go dependencies..." + go mod download + go mod tidy + @echo "$(GREEN)Development environment ready$(NC)" + +benchmark: ## Run comprehensive benchmarking suite + @echo "$(BLUE)Running comprehensive benchmark suite...$(NC)" + @echo "This will run multiple test scenarios and collect detailed metrics" + @echo "" + @$(MAKE) quick-test + @sleep 10 + @$(MAKE) standard-test + @sleep 10 + @$(MAKE) stress-test + @echo "$(GREEN)Benchmark suite completed!$(NC)" + +# Advanced targets +debug: ## Start services in debug mode with verbose logging + @echo "$(BLUE)Starting services in debug mode...$(NC)" + SEAWEEDFS_LOG_LEVEL=debug \ + KAFKA_LOG_LEVEL=debug \ + $(DOCKER_COMPOSE) up \ + seaweedfs-master \ + seaweedfs-volume \ + seaweedfs-filer \ + seaweedfs-mq-broker \ + kafka-gateway \ + schema-registry + +attach-loadtest: ## Attach to running load test container + $(DOCKER_COMPOSE) exec kafka-client-loadtest-runner /bin/sh + +exec-master: ## Execute shell in SeaweedFS master container + $(DOCKER_COMPOSE) exec seaweedfs-master /bin/sh + +exec-filer: ## Execute shell in SeaweedFS filer container + $(DOCKER_COMPOSE) exec seaweedfs-filer /bin/sh + +exec-gateway: ## Execute shell in Kafka gateway container + $(DOCKER_COMPOSE) exec kafka-gateway /bin/sh + +# Utility targets +ps: status ## Alias for status + +up: start ## Alias for start + +down: stop ## Alias for stop + +# Help is the default target +.DEFAULT_GOAL := help diff --git a/test/kafka/kafka-client-loadtest/README.md b/test/kafka/kafka-client-loadtest/README.md new file mode 100644 index 000000000..4f465a21b --- /dev/null +++ b/test/kafka/kafka-client-loadtest/README.md @@ -0,0 +1,397 @@ +# Kafka Client Load Test for SeaweedFS + +This comprehensive load testing suite validates the SeaweedFS MQ stack using real Kafka client libraries. Unlike the existing SMQ tests, this uses actual Kafka clients (`sarama` and `confluent-kafka-go`) to test the complete integration through: + +- **Kafka Clients** → **SeaweedFS Kafka Gateway** → **SeaweedFS MQ Broker** → **SeaweedFS Storage** + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐ +│ Kafka Client │ │ Kafka Gateway │ │ SeaweedFS MQ │ +│ Load Test │───▶│ (Port 9093) │───▶│ Broker │ +│ - Producers │ │ │ │ │ +│ - Consumers │ │ Protocol │ │ Topic Management │ +│ │ │ Translation │ │ Message Storage │ +└─────────────────┘ └──────────────────┘ └─────────────────────┘ + │ + ▼ + ┌─────────────────────┐ + │ SeaweedFS Storage │ + │ - Master │ + │ - Volume Server │ + │ - Filer │ + └─────────────────────┘ +``` + +## Features + +### 🚀 **Multiple Test Modes** +- **Producer-only**: Pure message production testing +- **Consumer-only**: Consumption from existing topics +- **Comprehensive**: Full producer + consumer load testing + +### 📊 **Rich Metrics & Monitoring** +- Prometheus metrics collection +- Grafana dashboards +- Real-time throughput and latency tracking +- Consumer lag monitoring +- Error rate analysis + +### 🔧 **Configurable Test Scenarios** +- **Quick Test**: 1-minute smoke test +- **Standard Test**: 5-minute medium load +- **Stress Test**: 10-minute high load +- **Endurance Test**: 30-minute sustained load +- **Custom**: Fully configurable parameters + +### 📈 **Message Types** +- **JSON**: Structured test messages +- **Avro**: Schema Registry integration +- **Binary**: Raw binary payloads + +### 🛠 **Kafka Client Support** +- **Sarama**: Native Go Kafka client +- **Confluent**: Official Confluent Go client +- Schema Registry integration +- Consumer group management + +## Quick Start + +### Prerequisites +- Docker & Docker Compose +- Make (optional, but recommended) + +### 1. Run Default Test +```bash +make test +``` +This runs a 5-minute comprehensive test with 10 producers and 5 consumers. + +### 2. Quick Smoke Test +```bash +make quick-test +``` +1-minute test with minimal load for validation. + +### 3. Stress Test +```bash +make stress-test +``` +10-minute high-throughput test with 20 producers and 10 consumers. + +### 4. Test with Monitoring +```bash +make test-with-monitoring +``` +Includes Prometheus + Grafana dashboards for real-time monitoring. + +## Detailed Usage + +### Manual Control +```bash +# Start infrastructure only +make start + +# Run load test against running infrastructure +make test TEST_MODE=comprehensive TEST_DURATION=10m + +# Stop everything +make stop + +# Clean up all resources +make clean +``` + +### Using Scripts Directly +```bash +# Full control with the main script +./scripts/run-loadtest.sh start -m comprehensive -d 10m --monitoring + +# Check service health +./scripts/wait-for-services.sh check + +# Setup monitoring configurations +./scripts/setup-monitoring.sh +``` + +### Environment Variables +```bash +export TEST_MODE=comprehensive # producer, consumer, comprehensive +export TEST_DURATION=300s # Test duration +export PRODUCER_COUNT=10 # Number of producer instances +export CONSUMER_COUNT=5 # Number of consumer instances +export MESSAGE_RATE=1000 # Messages/second per producer +export MESSAGE_SIZE=1024 # Message size in bytes +export TOPIC_COUNT=5 # Number of topics to create +export PARTITIONS_PER_TOPIC=3 # Partitions per topic + +make test +``` + +## Configuration + +### Main Configuration File +Edit `config/loadtest.yaml` to customize: + +- **Kafka Settings**: Bootstrap servers, security, timeouts +- **Producer Config**: Batching, compression, acknowledgments +- **Consumer Config**: Group settings, fetch parameters +- **Message Settings**: Size, format (JSON/Avro/Binary) +- **Schema Registry**: Avro/Protobuf schema validation +- **Metrics**: Prometheus collection intervals +- **Test Scenarios**: Predefined load patterns + +### Example Custom Configuration +```yaml +test_mode: "comprehensive" +duration: "600s" # 10 minutes + +producers: + count: 15 + message_rate: 2000 + message_size: 2048 + compression_type: "snappy" + acks: "all" + +consumers: + count: 8 + group_prefix: "high-load-group" + max_poll_records: 1000 + +topics: + count: 10 + partitions: 6 + replication_factor: 1 +``` + +## Test Scenarios + +### 1. Producer Performance Test +```bash +make producer-test TEST_DURATION=10m PRODUCER_COUNT=20 MESSAGE_RATE=3000 +``` +Tests maximum message production throughput. + +### 2. Consumer Performance Test +```bash +# First produce messages +make producer-test TEST_DURATION=5m + +# Then test consumption +make consumer-test TEST_DURATION=10m CONSUMER_COUNT=15 +``` + +### 3. Schema Registry Integration +```bash +# Enable schemas in config/loadtest.yaml +schemas: + enabled: true + +make test +``` +Tests Avro message serialization through Schema Registry. + +### 4. High Availability Test +```bash +# Test with container restarts during load +make test TEST_DURATION=20m & +sleep 300 +docker restart kafka-gateway +``` + +## Monitoring & Metrics + +### Real-Time Dashboards +When monitoring is enabled: +- **Prometheus**: http://localhost:9090 +- **Grafana**: http://localhost:3000 (admin/admin) + +### Key Metrics Tracked +- **Throughput**: Messages/second, MB/second +- **Latency**: End-to-end message latency percentiles +- **Errors**: Producer/consumer error rates +- **Consumer Lag**: Per-partition lag monitoring +- **Resource Usage**: CPU, memory, disk I/O + +### Grafana Dashboards +- **Kafka Load Test**: Comprehensive test metrics +- **SeaweedFS Cluster**: Storage system health +- **Custom Dashboards**: Extensible monitoring + +## Advanced Features + +### Schema Registry Testing +```bash +# Test Avro message serialization +export KAFKA_VALUE_TYPE=avro +make test +``` + +The load test includes: +- Schema registration +- Avro message encoding/decoding +- Schema evolution testing +- Compatibility validation + +### Multi-Client Testing +The test supports both Sarama and Confluent clients: +```go +// Configure in producer/consumer code +useConfluent := true // Switch client implementation +``` + +### Consumer Group Rebalancing +- Automatic consumer group management +- Partition rebalancing simulation +- Consumer failure recovery testing + +### Chaos Testing +```yaml +chaos: + enabled: true + producer_failure_rate: 0.01 + consumer_failure_rate: 0.01 + network_partition_probability: 0.001 +``` + +## Troubleshooting + +### Common Issues + +#### Services Not Starting +```bash +# Check service health +make health-check + +# View detailed logs +make logs + +# Debug mode +make debug +``` + +#### Low Throughput +- Increase `MESSAGE_RATE` and `PRODUCER_COUNT` +- Adjust `batch_size` and `linger_ms` in config +- Check consumer `max_poll_records` setting + +#### High Latency +- Reduce `linger_ms` for lower latency +- Adjust `acks` setting (0, 1, or "all") +- Monitor consumer lag + +#### Memory Issues +```bash +# Reduce concurrent clients +make test PRODUCER_COUNT=5 CONSUMER_COUNT=3 + +# Adjust message size +make test MESSAGE_SIZE=512 +``` + +### Debug Commands +```bash +# Execute shell in containers +make exec-master +make exec-filer +make exec-gateway + +# Attach to load test +make attach-loadtest + +# View real-time stats +curl http://localhost:8080/stats +``` + +## Development + +### Building from Source +```bash +# Set up development environment +make dev-env + +# Build load test binary +make build + +# Run tests locally (requires Go 1.21+) +cd cmd/loadtest && go run main.go -config ../../config/loadtest.yaml +``` + +### Extending the Tests +1. **Add new message formats** in `internal/producer/` +2. **Add custom metrics** in `internal/metrics/` +3. **Create new test scenarios** in `config/loadtest.yaml` +4. **Add monitoring panels** in `monitoring/grafana/dashboards/` + +### Contributing +1. Fork the repository +2. Create a feature branch +3. Add tests for new functionality +4. Ensure all tests pass: `make test` +5. Submit a pull request + +## Performance Benchmarks + +### Expected Performance (on typical hardware) + +| Scenario | Producers | Consumers | Rate (msg/s) | Latency (p95) | +|----------|-----------|-----------|--------------|---------------| +| Quick | 2 | 2 | 200 | <10ms | +| Standard | 5 | 3 | 2,500 | <20ms | +| Stress | 20 | 10 | 40,000 | <50ms | +| Endurance| 10 | 5 | 10,000 | <30ms | + +*Results vary based on hardware, network, and SeaweedFS configuration* + +### Tuning for Maximum Performance +```yaml +producers: + batch_size: 1000 + linger_ms: 10 + compression_type: "lz4" + acks: "1" # Balance between speed and durability + +consumers: + max_poll_records: 5000 + fetch_min_bytes: 1048576 # 1MB + fetch_max_wait_ms: 100 +``` + +## Comparison with Existing Tests + +| Feature | SMQ Tests | **Kafka Client Load Test** | +|---------|-----------|----------------------------| +| Protocol | SMQ (SeaweedFS native) | **Kafka (industry standard)** | +| Clients | SMQ clients | **Real Kafka clients (Sarama, Confluent)** | +| Schema Registry | ❌ | **✅ Full Avro/Protobuf support** | +| Consumer Groups | Basic | **✅ Full Kafka consumer group features** | +| Monitoring | Basic | **✅ Prometheus + Grafana dashboards** | +| Test Scenarios | Limited | **✅ Multiple predefined scenarios** | +| Real-world | Synthetic | **✅ Production-like workloads** | + +This load test provides comprehensive validation of the SeaweedFS Kafka Gateway using real-world Kafka clients and protocols. + +--- + +## Quick Reference + +```bash +# Essential Commands +make help # Show all available commands +make test # Run default comprehensive test +make quick-test # 1-minute smoke test +make stress-test # High-load stress test +make test-with-monitoring # Include Grafana dashboards +make clean # Clean up all resources + +# Monitoring +make monitor # Start Prometheus + Grafana +# → http://localhost:9090 (Prometheus) +# → http://localhost:3000 (Grafana, admin/admin) + +# Advanced +make benchmark # Run full benchmark suite +make health-check # Validate service health +make validate-setup # Check configuration +``` diff --git a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go new file mode 100644 index 000000000..2f435e600 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go @@ -0,0 +1,465 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/consumer" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/producer" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema" +) + +var ( + configFile = flag.String("config", "/config/loadtest.yaml", "Path to configuration file") + testMode = flag.String("mode", "", "Test mode override (producer|consumer|comprehensive)") + duration = flag.Duration("duration", 0, "Test duration override") + help = flag.Bool("help", false, "Show help") +) + +func main() { + flag.Parse() + + if *help { + printHelp() + return + } + + // Load configuration + cfg, err := config.Load(*configFile) + if err != nil { + log.Fatalf("Failed to load configuration: %v", err) + } + + // Override configuration with environment variables and flags + cfg.ApplyOverrides(*testMode, *duration) + + // Initialize metrics + metricsCollector := metrics.NewCollector() + + // Start metrics HTTP server + go func() { + http.Handle("/metrics", promhttp.Handler()) + http.HandleFunc("/health", healthCheck) + http.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) { + metricsCollector.WriteStats(w) + }) + + log.Printf("Starting metrics server on :8080") + if err := http.ListenAndServe(":8080", nil); err != nil { + log.Printf("Metrics server error: %v", err) + } + }() + + // Set up signal handling + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + log.Printf("Starting Kafka Client Load Test") + log.Printf("Mode: %s, Duration: %v", cfg.TestMode, cfg.Duration) + log.Printf("Kafka Brokers: %v", cfg.Kafka.BootstrapServers) + log.Printf("Schema Registry: %s", cfg.SchemaRegistry.URL) + log.Printf("Schemas Enabled: %v", cfg.Schemas.Enabled) + + // Register schemas if enabled + if cfg.Schemas.Enabled { + log.Printf("Registering schemas with Schema Registry...") + if err := registerSchemas(cfg); err != nil { + log.Fatalf("Failed to register schemas: %v", err) + } + log.Printf("Schemas registered successfully") + } + + var wg sync.WaitGroup + + // Start test based on mode + var testErr error + switch cfg.TestMode { + case "producer": + testErr = runProducerTest(ctx, cfg, metricsCollector, &wg) + case "consumer": + testErr = runConsumerTest(ctx, cfg, metricsCollector, &wg) + case "comprehensive": + testErr = runComprehensiveTest(ctx, cancel, cfg, metricsCollector, &wg) + default: + log.Fatalf("Unknown test mode: %s", cfg.TestMode) + } + + // If test returned an error (e.g., circuit breaker), exit + if testErr != nil { + log.Printf("Test failed with error: %v", testErr) + cancel() // Cancel context to stop any remaining goroutines + return + } + + // Wait for completion or signal + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-sigCh: + log.Printf("Received shutdown signal, stopping tests...") + cancel() + + // Wait for graceful shutdown with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + + select { + case <-done: + log.Printf("All tests completed gracefully") + case <-shutdownCtx.Done(): + log.Printf("Shutdown timeout, forcing exit") + } + case <-done: + log.Printf("All tests completed") + } + + // Print final statistics + log.Printf("Final Test Statistics:") + metricsCollector.PrintSummary() +} + +func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { + log.Printf("Starting producer-only test with %d producers", cfg.Producers.Count) + + errChan := make(chan error, cfg.Producers.Count) + + for i := 0; i < cfg.Producers.Count; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + prod, err := producer.New(cfg, collector, id) + if err != nil { + log.Printf("Failed to create producer %d: %v", id, err) + errChan <- err + return + } + defer prod.Close() + + if err := prod.Run(ctx); err != nil { + log.Printf("Producer %d failed: %v", id, err) + errChan <- err + return + } + }(i) + } + + // Wait for any producer error + select { + case err := <-errChan: + log.Printf("Producer test failed: %v", err) + return err + default: + return nil + } +} + +func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { + log.Printf("Starting consumer-only test with %d consumers", cfg.Consumers.Count) + + errChan := make(chan error, cfg.Consumers.Count) + + for i := 0; i < cfg.Consumers.Count; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + cons, err := consumer.New(cfg, collector, id) + if err != nil { + log.Printf("Failed to create consumer %d: %v", id, err) + errChan <- err + return + } + defer cons.Close() + + cons.Run(ctx) + }(i) + } + + // Consumers don't typically return errors in the same way, so just return nil + return nil +} + +func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { + log.Printf("Starting comprehensive test with %d producers and %d consumers", + cfg.Producers.Count, cfg.Consumers.Count) + + errChan := make(chan error, cfg.Producers.Count) + + // Create separate contexts for producers and consumers + producerCtx, producerCancel := context.WithCancel(ctx) + consumerCtx, consumerCancel := context.WithCancel(ctx) + + // Start producers + for i := 0; i < cfg.Producers.Count; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + prod, err := producer.New(cfg, collector, id) + if err != nil { + log.Printf("Failed to create producer %d: %v", id, err) + errChan <- err + return + } + defer prod.Close() + + if err := prod.Run(producerCtx); err != nil { + log.Printf("Producer %d failed: %v", id, err) + errChan <- err + return + } + }(i) + } + + // Wait briefly for producers to start producing messages + // Reduced from 5s to 2s to minimize message backlog + time.Sleep(2 * time.Second) + + // Start consumers + for i := 0; i < cfg.Consumers.Count; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + cons, err := consumer.New(cfg, collector, id) + if err != nil { + log.Printf("Failed to create consumer %d: %v", id, err) + return + } + defer cons.Close() + + cons.Run(consumerCtx) + }(i) + } + + // Check for producer errors + select { + case err := <-errChan: + log.Printf("Comprehensive test failed due to producer error: %v", err) + producerCancel() + consumerCancel() + return err + default: + // No immediate error, continue + } + + // If duration is set, stop producers first, then allow consumers extra time to drain + if cfg.Duration > 0 { + go func() { + timer := time.NewTimer(cfg.Duration) + defer timer.Stop() + + select { + case <-timer.C: + log.Printf("Test duration (%v) reached, stopping producers", cfg.Duration) + producerCancel() + + // Allow consumers extra time to drain remaining messages + // Calculate drain time based on test duration (minimum 60s, up to test duration) + drainTime := 60 * time.Second + if cfg.Duration > drainTime { + drainTime = cfg.Duration // Match test duration for longer tests + } + log.Printf("Allowing %v for consumers to drain remaining messages...", drainTime) + time.Sleep(drainTime) + + log.Printf("Stopping consumers after drain period") + consumerCancel() + cancel() + case <-ctx.Done(): + // Context already cancelled + producerCancel() + consumerCancel() + } + }() + } else { + // No duration set, wait for cancellation and ensure cleanup + go func() { + <-ctx.Done() + producerCancel() + consumerCancel() + }() + } + + return nil +} + +func healthCheck(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "OK") +} + +func printHelp() { + fmt.Printf(`Kafka Client Load Test for SeaweedFS + +Usage: %s [options] + +Options: + -config string + Path to configuration file (default "/config/loadtest.yaml") + -mode string + Test mode override (producer|consumer|comprehensive) + -duration duration + Test duration override + -help + Show this help message + +Environment Variables: + KAFKA_BOOTSTRAP_SERVERS Comma-separated list of Kafka brokers + SCHEMA_REGISTRY_URL URL of the Schema Registry + TEST_DURATION Test duration (e.g., "5m", "300s") + TEST_MODE Test mode (producer|consumer|comprehensive) + PRODUCER_COUNT Number of producer instances + CONSUMER_COUNT Number of consumer instances + MESSAGE_RATE Messages per second per producer + MESSAGE_SIZE Message size in bytes + TOPIC_COUNT Number of topics to create + PARTITIONS_PER_TOPIC Number of partitions per topic + VALUE_TYPE Message value type (json/avro/binary) + +Test Modes: + producer - Run only producers (generate load) + consumer - Run only consumers (consume existing messages) + comprehensive - Run both producers and consumers simultaneously + +Example: + %s -config ./config/loadtest.yaml -mode comprehensive -duration 10m + +`, os.Args[0], os.Args[0]) +} + +// registerSchemas registers schemas with Schema Registry for all topics +func registerSchemas(cfg *config.Config) error { + // Wait for Schema Registry to be ready + if err := waitForSchemaRegistry(cfg.SchemaRegistry.URL); err != nil { + return fmt.Errorf("schema registry not ready: %w", err) + } + + // Register schemas for each topic with different formats for variety + topics := cfg.GetTopicNames() + + // Determine schema formats - use different formats for different topics + // This provides comprehensive testing of all schema format variations + for i, topic := range topics { + var schemaFormat string + + // Distribute topics across three schema formats for comprehensive testing + // Format 0: AVRO (default, most common) + // Format 1: JSON (modern, human-readable) + // Format 2: PROTOBUF (efficient binary format) + switch i % 3 { + case 0: + schemaFormat = "AVRO" + case 1: + schemaFormat = "JSON" + case 2: + schemaFormat = "PROTOBUF" + } + + // Allow override from config if specified + if cfg.Producers.SchemaFormat != "" { + schemaFormat = cfg.Producers.SchemaFormat + } + + if err := registerTopicSchema(cfg.SchemaRegistry.URL, topic, schemaFormat); err != nil { + return fmt.Errorf("failed to register schema for topic %s (format: %s): %w", topic, schemaFormat, err) + } + log.Printf("Schema registered for topic %s with format: %s", topic, schemaFormat) + } + + return nil +} + +// waitForSchemaRegistry waits for Schema Registry to be ready +func waitForSchemaRegistry(url string) error { + maxRetries := 30 + for i := 0; i < maxRetries; i++ { + resp, err := http.Get(url + "/subjects") + if err == nil && resp.StatusCode == 200 { + resp.Body.Close() + return nil + } + if resp != nil { + resp.Body.Close() + } + time.Sleep(2 * time.Second) + } + return fmt.Errorf("schema registry not ready after %d retries", maxRetries) +} + +// registerTopicSchema registers a schema for a specific topic +func registerTopicSchema(registryURL, topicName, schemaFormat string) error { + // Determine schema format, default to AVRO + if schemaFormat == "" { + schemaFormat = "AVRO" + } + + var schemaStr string + var schemaType string + + switch strings.ToUpper(schemaFormat) { + case "AVRO": + schemaStr = schema.GetAvroSchema() + schemaType = "AVRO" + case "JSON", "JSON_SCHEMA": + schemaStr = schema.GetJSONSchema() + schemaType = "JSON" + case "PROTOBUF": + schemaStr = schema.GetProtobufSchema() + schemaType = "PROTOBUF" + default: + return fmt.Errorf("unsupported schema format: %s", schemaFormat) + } + + schemaReq := map[string]interface{}{ + "schema": schemaStr, + "schemaType": schemaType, + } + + jsonData, err := json.Marshal(schemaReq) + if err != nil { + return err + } + + // Register schema for topic value + subject := topicName + "-value" + url := fmt.Sprintf("%s/subjects/%s/versions", registryURL, subject) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Post(url, "application/vnd.schemaregistry.v1+json", bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("schema registration failed: status=%d, body=%s", resp.StatusCode, string(body)) + } + + log.Printf("Schema registered for topic %s (format: %s)", topicName, schemaType) + return nil +} diff --git a/test/kafka/kafka-client-loadtest/config/loadtest.yaml b/test/kafka/kafka-client-loadtest/config/loadtest.yaml new file mode 100644 index 000000000..6a453aab9 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/config/loadtest.yaml @@ -0,0 +1,169 @@ +# Kafka Client Load Test Configuration + +# Test execution settings +test_mode: "comprehensive" # producer, consumer, comprehensive +duration: "60s" # Test duration (0 = run indefinitely) - producers will stop at this time, consumers get +120s to drain + +# Kafka cluster configuration +kafka: + bootstrap_servers: + - "kafka-gateway:9093" + # Security settings (if needed) + security_protocol: "PLAINTEXT" # PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL + sasl_mechanism: "" # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 + sasl_username: "" + sasl_password: "" + +# Schema Registry configuration +schema_registry: + url: "http://schema-registry:8081" + auth: + username: "" + password: "" + +# Producer configuration +producers: + count: 10 # Number of producer instances + message_rate: 1000 # Messages per second per producer + message_size: 1024 # Message size in bytes + batch_size: 100 # Batch size for batching + linger_ms: 5 # Time to wait for batching + compression_type: "snappy" # none, gzip, snappy, lz4, zstd + acks: "all" # 0, 1, all + retries: 3 + retry_backoff_ms: 100 + request_timeout_ms: 30000 + delivery_timeout_ms: 120000 + + # Message generation settings + key_distribution: "random" # random, sequential, uuid + value_type: "avro" # json, avro, protobuf, binary + schema_format: "" # AVRO, JSON, PROTOBUF - schema registry format (when schemas enabled) + # Leave empty to auto-distribute formats across topics for testing: + # topic-0: AVRO, topic-1: JSON, topic-2: PROTOBUF, topic-3: AVRO, topic-4: JSON + # Set to specific format (e.g. "AVRO") to use same format for all topics + include_timestamp: true + include_headers: true + +# Consumer configuration +consumers: + count: 5 # Number of consumer instances + group_prefix: "loadtest-group" # Consumer group prefix + auto_offset_reset: "earliest" # earliest, latest + enable_auto_commit: true + auto_commit_interval_ms: 1000 + session_timeout_ms: 30000 + heartbeat_interval_ms: 3000 + max_poll_records: 500 + max_poll_interval_ms: 300000 + fetch_min_bytes: 1 + fetch_max_bytes: 52428800 # 50MB + fetch_max_wait_ms: 100 # 100ms - very fast polling for concurrent fetches and quick drain + +# Topic configuration +topics: + count: 5 # Number of topics to create/use + prefix: "loadtest-topic" # Topic name prefix + partitions: 4 # Partitions per topic (default: 4) + replication_factor: 1 # Replication factor + cleanup_policy: "delete" # delete, compact + retention_ms: 604800000 # 7 days + segment_ms: 86400000 # 1 day + +# Schema configuration (for Avro/Protobuf tests) +schemas: + enabled: true + registry_timeout_ms: 10000 + + # Test schemas + user_event: + type: "avro" + schema: | + { + "type": "record", + "name": "UserEvent", + "namespace": "com.seaweedfs.test", + "fields": [ + {"name": "user_id", "type": "string"}, + {"name": "event_type", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "properties", "type": {"type": "map", "values": "string"}} + ] + } + + transaction: + type: "avro" + schema: | + { + "type": "record", + "name": "Transaction", + "namespace": "com.seaweedfs.test", + "fields": [ + {"name": "transaction_id", "type": "string"}, + {"name": "amount", "type": "double"}, + {"name": "currency", "type": "string"}, + {"name": "merchant_id", "type": "string"}, + {"name": "timestamp", "type": "long"} + ] + } + +# Metrics and monitoring +metrics: + enabled: true + collection_interval: "10s" + prometheus_port: 8080 + + # What to measure + track_latency: true + track_throughput: true + track_errors: true + track_consumer_lag: true + + # Latency percentiles to track + latency_percentiles: [50, 90, 95, 99, 99.9] + +# Load test scenarios +scenarios: + # Steady state load test + steady_load: + producer_rate: 1000 # messages/sec per producer + ramp_up_time: "30s" + steady_duration: "240s" + ramp_down_time: "30s" + + # Burst load test + burst_load: + base_rate: 500 + burst_rate: 5000 + burst_duration: "10s" + burst_interval: "60s" + + # Gradual ramp test + ramp_test: + start_rate: 100 + end_rate: 2000 + ramp_duration: "300s" + step_duration: "30s" + +# Error injection (for resilience testing) +chaos: + enabled: false + producer_failure_rate: 0.01 # 1% of producers fail randomly + consumer_failure_rate: 0.01 # 1% of consumers fail randomly + network_partition_probability: 0.001 # Network issues + broker_restart_interval: "0s" # Restart brokers periodically (0s = disabled) + +# Output and reporting +output: + results_dir: "/test-results" + export_prometheus: true + export_csv: true + export_json: true + real_time_stats: true + stats_interval: "30s" + +# Logging +logging: + level: "info" # debug, info, warn, error + format: "text" # text, json + enable_kafka_logs: false # Enable Kafka client debug logs
\ No newline at end of file diff --git a/test/kafka/kafka-client-loadtest/docker-compose-kafka-compare.yml b/test/kafka/kafka-client-loadtest/docker-compose-kafka-compare.yml new file mode 100644 index 000000000..e3184941b --- /dev/null +++ b/test/kafka/kafka-client-loadtest/docker-compose-kafka-compare.yml @@ -0,0 +1,46 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + hostname: zookeeper + container_name: compare-zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.5.0 + hostname: kafka + container_name: compare-kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_LOG_RETENTION_HOURS: 1 + KAFKA_LOG_SEGMENT_BYTES: 1073741824 + + schema-registry: + image: confluentinc/cp-schema-registry:7.5.0 + hostname: schema-registry + container_name: compare-schema-registry + depends_on: + - kafka + ports: + - "8082:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + diff --git a/test/kafka/kafka-client-loadtest/docker-compose.yml b/test/kafka/kafka-client-loadtest/docker-compose.yml new file mode 100644 index 000000000..54b49ecd2 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/docker-compose.yml @@ -0,0 +1,316 @@ +# SeaweedFS Kafka Client Load Test +# Tests the full stack: Kafka Clients -> SeaweedFS Kafka Gateway -> SeaweedFS MQ Broker -> Storage + +x-seaweedfs-build: &seaweedfs-build + build: + context: . + dockerfile: Dockerfile.seaweedfs + args: + TARGETARCH: ${GOARCH:-arm64} + CACHE_BUST: ${CACHE_BUST:-latest} + image: kafka-client-loadtest-seaweedfs + +services: + # Schema Registry (for Avro/Protobuf support) + # Using host networking to connect to localhost:9093 (where our gateway advertises) + # WORKAROUND: Schema Registry hangs on empty _schemas topic during bootstrap + # Pre-create the topic first to avoid "wait to catch up" hang + schema-registry-init: + image: confluentinc/cp-kafka:8.0.0 + container_name: loadtest-schema-registry-init + networks: + - kafka-loadtest-net + depends_on: + kafka-gateway: + condition: service_healthy + command: > + bash -c " + echo 'Creating _schemas topic...'; + kafka-topics --create --topic _schemas --partitions 1 --replication-factor 1 --bootstrap-server kafka-gateway:9093 --if-not-exists || exit 0; + echo '_schemas topic created successfully'; + " + + schema-registry: + image: confluentinc/cp-schema-registry:8.0.0 + container_name: loadtest-schema-registry + restart: on-failure:3 + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_HOST_PORT: 8081 + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-gateway:9093' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas + SCHEMA_REGISTRY_DEBUG: "true" + SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "full" + SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true" + SCHEMA_REGISTRY_MODE: "READWRITE" + SCHEMA_REGISTRY_GROUP_ID: "schema-registry" + SCHEMA_REGISTRY_KAFKASTORE_GROUP_ID: "schema-registry" + SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "PLAINTEXT" + SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: "1" + SCHEMA_REGISTRY_KAFKASTORE_INIT_TIMEOUT: "120000" + SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT: "60000" + SCHEMA_REGISTRY_REQUEST_TIMEOUT_MS: "60000" + SCHEMA_REGISTRY_RETRY_BACKOFF_MS: "1000" + # Force IPv4 to work around Java IPv6 issues + # Enable verbose logging and set reasonable memory limits + KAFKA_OPTS: "-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Addresses=true -Xmx512M -Xms256M" + KAFKA_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/kafka/log4j.properties" + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: "INFO" + SCHEMA_REGISTRY_KAFKASTORE_WRITE_TIMEOUT_MS: "60000" + SCHEMA_REGISTRY_KAFKASTORE_INIT_RETRY_BACKOFF_MS: "5000" + SCHEMA_REGISTRY_KAFKASTORE_CONSUMER_AUTO_OFFSET_RESET: "earliest" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"] + interval: 15s + timeout: 10s + retries: 10 + start_period: 30s + depends_on: + schema-registry-init: + condition: service_completed_successfully + kafka-gateway: + condition: service_healthy + networks: + - kafka-loadtest-net + + # SeaweedFS Master (coordinator) + seaweedfs-master: + <<: *seaweedfs-build + container_name: loadtest-seaweedfs-master + ports: + - "9333:9333" + - "19333:19333" + command: + - master + - -ip=seaweedfs-master + - -port=9333 + - -port.grpc=19333 + - -volumeSizeLimitMB=48 + - -defaultReplication=000 + - -garbageThreshold=0.3 + volumes: + - ./data/seaweedfs-master:/data + healthcheck: + test: ["CMD-SHELL", "wget --quiet --tries=1 --spider http://seaweedfs-master:9333/cluster/status || exit 1"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 20s + networks: + - kafka-loadtest-net + + # SeaweedFS Volume Server (storage) + seaweedfs-volume: + <<: *seaweedfs-build + container_name: loadtest-seaweedfs-volume + ports: + - "8080:8080" + - "18080:18080" + command: + - volume + - -mserver=seaweedfs-master:9333 + - -ip=seaweedfs-volume + - -port=8080 + - -port.grpc=18080 + - -publicUrl=seaweedfs-volume:8080 + - -preStopSeconds=1 + - -compactionMBps=50 + - -max=0 + - -dir=/data + depends_on: + seaweedfs-master: + condition: service_healthy + volumes: + - ./data/seaweedfs-volume:/data + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-volume:8080/status"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 15s + networks: + - kafka-loadtest-net + + # SeaweedFS Filer (metadata) + seaweedfs-filer: + <<: *seaweedfs-build + container_name: loadtest-seaweedfs-filer + ports: + - "8888:8888" + - "18888:18888" + - "18889:18889" + command: + - filer + - -master=seaweedfs-master:9333 + - -ip=seaweedfs-filer + - -port=8888 + - -port.grpc=18888 + - -metricsPort=18889 + - -defaultReplicaPlacement=000 + depends_on: + seaweedfs-master: + condition: service_healthy + seaweedfs-volume: + condition: service_healthy + volumes: + - ./data/seaweedfs-filer:/data + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-filer:8888/"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 15s + networks: + - kafka-loadtest-net + + # SeaweedFS MQ Broker (message handling) + seaweedfs-mq-broker: + <<: *seaweedfs-build + container_name: loadtest-seaweedfs-mq-broker + ports: + - "17777:17777" + - "18777:18777" # pprof profiling port + command: + - mq.broker + - -master=seaweedfs-master:9333 + - -ip=seaweedfs-mq-broker + - -port=17777 + - -logFlushInterval=0 + - -port.pprof=18777 + depends_on: + seaweedfs-filer: + condition: service_healthy + volumes: + - ./data/seaweedfs-mq:/data + healthcheck: + test: ["CMD", "nc", "-z", "localhost", "17777"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 20s + networks: + - kafka-loadtest-net + + # SeaweedFS Kafka Gateway (Kafka protocol compatibility) + kafka-gateway: + <<: *seaweedfs-build + container_name: loadtest-kafka-gateway + ports: + - "9093:9093" + - "10093:10093" # pprof profiling port + command: + - mq.kafka.gateway + - -master=seaweedfs-master:9333 + - -ip=kafka-gateway + - -ip.bind=0.0.0.0 + - -port=9093 + - -default-partitions=4 + - -schema-registry-url=http://schema-registry:8081 + - -port.pprof=10093 + depends_on: + seaweedfs-filer: + condition: service_healthy + seaweedfs-mq-broker: + condition: service_healthy + environment: + - SEAWEEDFS_MASTERS=seaweedfs-master:9333 + # - KAFKA_DEBUG=1 # Enable debug logging for Schema Registry troubleshooting + - KAFKA_ADVERTISED_HOST=kafka-gateway + volumes: + - ./data/kafka-gateway:/data + healthcheck: + test: ["CMD", "nc", "-z", "localhost", "9093"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 45s # Increased to account for 10s startup delay + filer discovery + networks: + - kafka-loadtest-net + + # Kafka Client Load Test Runner + kafka-client-loadtest: + build: + context: ../../.. + dockerfile: test/kafka/kafka-client-loadtest/Dockerfile.loadtest + container_name: kafka-client-loadtest-runner + depends_on: + kafka-gateway: + condition: service_healthy + # schema-registry: + # condition: service_healthy + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka-gateway:9093 + - SCHEMA_REGISTRY_URL=http://schema-registry:8081 + - TEST_DURATION=${TEST_DURATION:-300s} + - PRODUCER_COUNT=${PRODUCER_COUNT:-10} + - CONSUMER_COUNT=${CONSUMER_COUNT:-5} + - MESSAGE_RATE=${MESSAGE_RATE:-1000} + - MESSAGE_SIZE=${MESSAGE_SIZE:-1024} + - TOPIC_COUNT=${TOPIC_COUNT:-5} + - PARTITIONS_PER_TOPIC=${PARTITIONS_PER_TOPIC:-3} + - TEST_MODE=${TEST_MODE:-comprehensive} + - SCHEMAS_ENABLED=true + - VALUE_TYPE=${VALUE_TYPE:-avro} + profiles: + - loadtest + volumes: + - ./test-results:/test-results + networks: + - kafka-loadtest-net + + # Monitoring and Metrics + prometheus: + image: prom/prometheus:latest + container_name: loadtest-prometheus + ports: + - "9090:9090" + volumes: + - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml + - prometheus-data:/prometheus + networks: + - kafka-loadtest-net + profiles: + - monitoring + + grafana: + image: grafana/grafana:latest + container_name: loadtest-grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + volumes: + - ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards + - ./monitoring/grafana/provisioning:/etc/grafana/provisioning + - grafana-data:/var/lib/grafana + networks: + - kafka-loadtest-net + profiles: + - monitoring + + # Schema Registry Debug Runner + schema-registry-debug: + build: + context: debug-client + dockerfile: Dockerfile + container_name: schema-registry-debug-runner + depends_on: + kafka-gateway: + condition: service_healthy + networks: + - kafka-loadtest-net + profiles: + - debug + +volumes: + prometheus-data: + grafana-data: + +networks: + kafka-loadtest-net: + driver: bridge + name: kafka-client-loadtest + diff --git a/test/kafka/kafka-client-loadtest/go.mod b/test/kafka/kafka-client-loadtest/go.mod new file mode 100644 index 000000000..6ebbfc396 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/go.mod @@ -0,0 +1,41 @@ +module github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest + +go 1.24.0 + +toolchain go1.24.7 + +require ( + github.com/IBM/sarama v1.46.1 + github.com/linkedin/goavro/v2 v2.14.0 + github.com/prometheus/client_golang v1.23.2 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sys v0.36.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect +) diff --git a/test/kafka/kafka-client-loadtest/go.sum b/test/kafka/kafka-client-loadtest/go.sum new file mode 100644 index 000000000..d1869c0fc --- /dev/null +++ b/test/kafka/kafka-client-loadtest/go.sum @@ -0,0 +1,129 @@ +github.com/IBM/sarama v1.46.1 h1:AlDkvyQm4LKktoQZxv0sbTfH3xukeH7r/UFBbUmFV9M= +github.com/IBM/sarama v1.46.1/go.mod h1:ipyOREIx+o9rMSrrPGLZHGuT0mzecNzKd19Quq+Q8AA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI= +github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/kafka/kafka-client-loadtest/internal/config/config.go b/test/kafka/kafka-client-loadtest/internal/config/config.go new file mode 100644 index 000000000..dd9f6d6b2 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/config/config.go @@ -0,0 +1,361 @@ +package config + +import ( + "fmt" + "os" + "strconv" + "strings" + "time" + + "gopkg.in/yaml.v3" +) + +// Config represents the complete load test configuration +type Config struct { + TestMode string `yaml:"test_mode"` + Duration time.Duration `yaml:"duration"` + + Kafka KafkaConfig `yaml:"kafka"` + SchemaRegistry SchemaRegistryConfig `yaml:"schema_registry"` + Producers ProducersConfig `yaml:"producers"` + Consumers ConsumersConfig `yaml:"consumers"` + Topics TopicsConfig `yaml:"topics"` + Schemas SchemasConfig `yaml:"schemas"` + Metrics MetricsConfig `yaml:"metrics"` + Scenarios ScenariosConfig `yaml:"scenarios"` + Chaos ChaosConfig `yaml:"chaos"` + Output OutputConfig `yaml:"output"` + Logging LoggingConfig `yaml:"logging"` +} + +type KafkaConfig struct { + BootstrapServers []string `yaml:"bootstrap_servers"` + SecurityProtocol string `yaml:"security_protocol"` + SASLMechanism string `yaml:"sasl_mechanism"` + SASLUsername string `yaml:"sasl_username"` + SASLPassword string `yaml:"sasl_password"` +} + +type SchemaRegistryConfig struct { + URL string `yaml:"url"` + Auth struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + } `yaml:"auth"` +} + +type ProducersConfig struct { + Count int `yaml:"count"` + MessageRate int `yaml:"message_rate"` + MessageSize int `yaml:"message_size"` + BatchSize int `yaml:"batch_size"` + LingerMs int `yaml:"linger_ms"` + CompressionType string `yaml:"compression_type"` + Acks string `yaml:"acks"` + Retries int `yaml:"retries"` + RetryBackoffMs int `yaml:"retry_backoff_ms"` + RequestTimeoutMs int `yaml:"request_timeout_ms"` + DeliveryTimeoutMs int `yaml:"delivery_timeout_ms"` + KeyDistribution string `yaml:"key_distribution"` + ValueType string `yaml:"value_type"` // json, avro, protobuf, binary + SchemaFormat string `yaml:"schema_format"` // AVRO, JSON, PROTOBUF (schema registry format) + IncludeTimestamp bool `yaml:"include_timestamp"` + IncludeHeaders bool `yaml:"include_headers"` +} + +type ConsumersConfig struct { + Count int `yaml:"count"` + GroupPrefix string `yaml:"group_prefix"` + AutoOffsetReset string `yaml:"auto_offset_reset"` + EnableAutoCommit bool `yaml:"enable_auto_commit"` + AutoCommitIntervalMs int `yaml:"auto_commit_interval_ms"` + SessionTimeoutMs int `yaml:"session_timeout_ms"` + HeartbeatIntervalMs int `yaml:"heartbeat_interval_ms"` + MaxPollRecords int `yaml:"max_poll_records"` + MaxPollIntervalMs int `yaml:"max_poll_interval_ms"` + FetchMinBytes int `yaml:"fetch_min_bytes"` + FetchMaxBytes int `yaml:"fetch_max_bytes"` + FetchMaxWaitMs int `yaml:"fetch_max_wait_ms"` +} + +type TopicsConfig struct { + Count int `yaml:"count"` + Prefix string `yaml:"prefix"` + Partitions int `yaml:"partitions"` + ReplicationFactor int `yaml:"replication_factor"` + CleanupPolicy string `yaml:"cleanup_policy"` + RetentionMs int64 `yaml:"retention_ms"` + SegmentMs int64 `yaml:"segment_ms"` +} + +type SchemaConfig struct { + Type string `yaml:"type"` + Schema string `yaml:"schema"` +} + +type SchemasConfig struct { + Enabled bool `yaml:"enabled"` + RegistryTimeoutMs int `yaml:"registry_timeout_ms"` + UserEvent SchemaConfig `yaml:"user_event"` + Transaction SchemaConfig `yaml:"transaction"` +} + +type MetricsConfig struct { + Enabled bool `yaml:"enabled"` + CollectionInterval time.Duration `yaml:"collection_interval"` + PrometheusPort int `yaml:"prometheus_port"` + TrackLatency bool `yaml:"track_latency"` + TrackThroughput bool `yaml:"track_throughput"` + TrackErrors bool `yaml:"track_errors"` + TrackConsumerLag bool `yaml:"track_consumer_lag"` + LatencyPercentiles []float64 `yaml:"latency_percentiles"` +} + +type ScenarioConfig struct { + ProducerRate int `yaml:"producer_rate"` + RampUpTime time.Duration `yaml:"ramp_up_time"` + SteadyDuration time.Duration `yaml:"steady_duration"` + RampDownTime time.Duration `yaml:"ramp_down_time"` + BaseRate int `yaml:"base_rate"` + BurstRate int `yaml:"burst_rate"` + BurstDuration time.Duration `yaml:"burst_duration"` + BurstInterval time.Duration `yaml:"burst_interval"` + StartRate int `yaml:"start_rate"` + EndRate int `yaml:"end_rate"` + RampDuration time.Duration `yaml:"ramp_duration"` + StepDuration time.Duration `yaml:"step_duration"` +} + +type ScenariosConfig struct { + SteadyLoad ScenarioConfig `yaml:"steady_load"` + BurstLoad ScenarioConfig `yaml:"burst_load"` + RampTest ScenarioConfig `yaml:"ramp_test"` +} + +type ChaosConfig struct { + Enabled bool `yaml:"enabled"` + ProducerFailureRate float64 `yaml:"producer_failure_rate"` + ConsumerFailureRate float64 `yaml:"consumer_failure_rate"` + NetworkPartitionProbability float64 `yaml:"network_partition_probability"` + BrokerRestartInterval time.Duration `yaml:"broker_restart_interval"` +} + +type OutputConfig struct { + ResultsDir string `yaml:"results_dir"` + ExportPrometheus bool `yaml:"export_prometheus"` + ExportCSV bool `yaml:"export_csv"` + ExportJSON bool `yaml:"export_json"` + RealTimeStats bool `yaml:"real_time_stats"` + StatsInterval time.Duration `yaml:"stats_interval"` +} + +type LoggingConfig struct { + Level string `yaml:"level"` + Format string `yaml:"format"` + EnableKafkaLogs bool `yaml:"enable_kafka_logs"` +} + +// Load reads and parses the configuration file +func Load(configFile string) (*Config, error) { + data, err := os.ReadFile(configFile) + if err != nil { + return nil, fmt.Errorf("failed to read config file %s: %w", configFile, err) + } + + var cfg Config + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("failed to parse config file %s: %w", configFile, err) + } + + // Apply default values + cfg.setDefaults() + + // Apply environment variable overrides + cfg.applyEnvOverrides() + + return &cfg, nil +} + +// ApplyOverrides applies command-line flag overrides +func (c *Config) ApplyOverrides(testMode string, duration time.Duration) { + if testMode != "" { + c.TestMode = testMode + } + if duration > 0 { + c.Duration = duration + } +} + +// setDefaults sets default values for optional fields +func (c *Config) setDefaults() { + if c.TestMode == "" { + c.TestMode = "comprehensive" + } + + if len(c.Kafka.BootstrapServers) == 0 { + c.Kafka.BootstrapServers = []string{"kafka-gateway:9093"} + } + + if c.SchemaRegistry.URL == "" { + c.SchemaRegistry.URL = "http://schema-registry:8081" + } + + // Schema support is always enabled since Kafka Gateway now enforces schema-first behavior + c.Schemas.Enabled = true + + if c.Producers.Count == 0 { + c.Producers.Count = 10 + } + + if c.Consumers.Count == 0 { + c.Consumers.Count = 5 + } + + if c.Topics.Count == 0 { + c.Topics.Count = 5 + } + + if c.Topics.Prefix == "" { + c.Topics.Prefix = "loadtest-topic" + } + + if c.Topics.Partitions == 0 { + c.Topics.Partitions = 4 // Default to 4 partitions + } + + if c.Topics.ReplicationFactor == 0 { + c.Topics.ReplicationFactor = 1 // Default to 1 replica + } + + if c.Consumers.GroupPrefix == "" { + c.Consumers.GroupPrefix = "loadtest-group" + } + + if c.Output.ResultsDir == "" { + c.Output.ResultsDir = "/test-results" + } + + if c.Metrics.CollectionInterval == 0 { + c.Metrics.CollectionInterval = 10 * time.Second + } + + if c.Output.StatsInterval == 0 { + c.Output.StatsInterval = 30 * time.Second + } +} + +// applyEnvOverrides applies environment variable overrides +func (c *Config) applyEnvOverrides() { + if servers := os.Getenv("KAFKA_BOOTSTRAP_SERVERS"); servers != "" { + c.Kafka.BootstrapServers = strings.Split(servers, ",") + } + + if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" { + c.SchemaRegistry.URL = url + } + + if mode := os.Getenv("TEST_MODE"); mode != "" { + c.TestMode = mode + } + + if duration := os.Getenv("TEST_DURATION"); duration != "" { + if d, err := time.ParseDuration(duration); err == nil { + c.Duration = d + } + } + + if count := os.Getenv("PRODUCER_COUNT"); count != "" { + if i, err := strconv.Atoi(count); err == nil { + c.Producers.Count = i + } + } + + if count := os.Getenv("CONSUMER_COUNT"); count != "" { + if i, err := strconv.Atoi(count); err == nil { + c.Consumers.Count = i + } + } + + if rate := os.Getenv("MESSAGE_RATE"); rate != "" { + if i, err := strconv.Atoi(rate); err == nil { + c.Producers.MessageRate = i + } + } + + if size := os.Getenv("MESSAGE_SIZE"); size != "" { + if i, err := strconv.Atoi(size); err == nil { + c.Producers.MessageSize = i + } + } + + if count := os.Getenv("TOPIC_COUNT"); count != "" { + if i, err := strconv.Atoi(count); err == nil { + c.Topics.Count = i + } + } + + if partitions := os.Getenv("PARTITIONS_PER_TOPIC"); partitions != "" { + if i, err := strconv.Atoi(partitions); err == nil { + c.Topics.Partitions = i + } + } + + if valueType := os.Getenv("VALUE_TYPE"); valueType != "" { + c.Producers.ValueType = valueType + } + + if schemaFormat := os.Getenv("SCHEMA_FORMAT"); schemaFormat != "" { + c.Producers.SchemaFormat = schemaFormat + } + + if enabled := os.Getenv("SCHEMAS_ENABLED"); enabled != "" { + c.Schemas.Enabled = enabled == "true" + } +} + +// GetTopicNames returns the list of topic names to use for testing +func (c *Config) GetTopicNames() []string { + topics := make([]string, c.Topics.Count) + for i := 0; i < c.Topics.Count; i++ { + topics[i] = fmt.Sprintf("%s-%d", c.Topics.Prefix, i) + } + return topics +} + +// GetConsumerGroupNames returns the list of consumer group names +func (c *Config) GetConsumerGroupNames() []string { + groups := make([]string, c.Consumers.Count) + for i := 0; i < c.Consumers.Count; i++ { + groups[i] = fmt.Sprintf("%s-%d", c.Consumers.GroupPrefix, i) + } + return groups +} + +// Validate validates the configuration +func (c *Config) Validate() error { + if c.TestMode != "producer" && c.TestMode != "consumer" && c.TestMode != "comprehensive" { + return fmt.Errorf("invalid test mode: %s", c.TestMode) + } + + if len(c.Kafka.BootstrapServers) == 0 { + return fmt.Errorf("kafka bootstrap servers not specified") + } + + if c.Producers.Count <= 0 && (c.TestMode == "producer" || c.TestMode == "comprehensive") { + return fmt.Errorf("producer count must be greater than 0 for producer or comprehensive tests") + } + + if c.Consumers.Count <= 0 && (c.TestMode == "consumer" || c.TestMode == "comprehensive") { + return fmt.Errorf("consumer count must be greater than 0 for consumer or comprehensive tests") + } + + if c.Topics.Count <= 0 { + return fmt.Errorf("topic count must be greater than 0") + } + + if c.Topics.Partitions <= 0 { + return fmt.Errorf("partitions per topic must be greater than 0") + } + + return nil +} diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go new file mode 100644 index 000000000..e1c4caa41 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -0,0 +1,626 @@ +package consumer + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/IBM/sarama" + "github.com/linkedin/goavro/v2" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" + pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "google.golang.org/protobuf/proto" +) + +// Consumer represents a Kafka consumer for load testing +type Consumer struct { + id int + config *config.Config + metricsCollector *metrics.Collector + saramaConsumer sarama.ConsumerGroup + useConfluent bool // Always false, Sarama only + topics []string + consumerGroup string + avroCodec *goavro.Codec + + // Schema format tracking per topic + schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, PROTOBUF) + + // Processing tracking + messagesProcessed int64 + lastOffset map[string]map[int32]int64 + offsetMutex sync.RWMutex +} + +// New creates a new consumer instance +func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) { + consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id) + + c := &Consumer{ + id: id, + config: cfg, + metricsCollector: collector, + topics: cfg.GetTopicNames(), + consumerGroup: consumerGroup, + useConfluent: false, // Use Sarama by default + lastOffset: make(map[string]map[int32]int64), + schemaFormats: make(map[string]string), + } + + // Initialize schema formats for each topic (must match producer logic) + // This mirrors the format distribution in cmd/loadtest/main.go registerSchemas() + for i, topic := range c.topics { + var schemaFormat string + if cfg.Producers.SchemaFormat != "" { + // Use explicit config if provided + schemaFormat = cfg.Producers.SchemaFormat + } else { + // Distribute across formats (same as producer) + switch i % 3 { + case 0: + schemaFormat = "AVRO" + case 1: + schemaFormat = "JSON" + case 2: + schemaFormat = "PROTOBUF" + } + } + c.schemaFormats[topic] = schemaFormat + log.Printf("Consumer %d: Topic %s will use schema format: %s", id, topic, schemaFormat) + } + + // Initialize consumer based on configuration + if c.useConfluent { + if err := c.initConfluentConsumer(); err != nil { + return nil, fmt.Errorf("failed to initialize Confluent consumer: %w", err) + } + } else { + if err := c.initSaramaConsumer(); err != nil { + return nil, fmt.Errorf("failed to initialize Sarama consumer: %w", err) + } + } + + // Initialize Avro codec if schemas are enabled + if cfg.Schemas.Enabled { + if err := c.initAvroCodec(); err != nil { + return nil, fmt.Errorf("failed to initialize Avro codec: %w", err) + } + } + + log.Printf("Consumer %d initialized for group %s", id, consumerGroup) + return c, nil +} + +// initSaramaConsumer initializes the Sarama consumer group +func (c *Consumer) initSaramaConsumer() error { + config := sarama.NewConfig() + + // Consumer configuration + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest + if c.config.Consumers.AutoOffsetReset == "latest" { + config.Consumer.Offsets.Initial = sarama.OffsetNewest + } + + // Auto commit configuration + config.Consumer.Offsets.AutoCommit.Enable = c.config.Consumers.EnableAutoCommit + config.Consumer.Offsets.AutoCommit.Interval = time.Duration(c.config.Consumers.AutoCommitIntervalMs) * time.Millisecond + + // Session and heartbeat configuration + config.Consumer.Group.Session.Timeout = time.Duration(c.config.Consumers.SessionTimeoutMs) * time.Millisecond + config.Consumer.Group.Heartbeat.Interval = time.Duration(c.config.Consumers.HeartbeatIntervalMs) * time.Millisecond + + // Fetch configuration + config.Consumer.Fetch.Min = int32(c.config.Consumers.FetchMinBytes) + config.Consumer.Fetch.Default = 10 * 1024 * 1024 // 10MB per partition (increased from 1MB default) + config.Consumer.Fetch.Max = int32(c.config.Consumers.FetchMaxBytes) + config.Consumer.MaxWaitTime = time.Duration(c.config.Consumers.FetchMaxWaitMs) * time.Millisecond + config.Consumer.MaxProcessingTime = time.Duration(c.config.Consumers.MaxPollIntervalMs) * time.Millisecond + + // Channel buffer sizes for concurrent partition consumption + config.ChannelBufferSize = 256 // Increase from default 256 to allow more buffering + + // Enable concurrent partition fetching by increasing the number of broker connections + // This allows Sarama to fetch from multiple partitions in parallel + config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests + + // Version + config.Version = sarama.V2_8_0_0 + + // Create consumer group + consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config) + if err != nil { + return fmt.Errorf("failed to create Sarama consumer group: %w", err) + } + + c.saramaConsumer = consumerGroup + return nil +} + +// initConfluentConsumer initializes the Confluent Kafka Go consumer +func (c *Consumer) initConfluentConsumer() error { + // Confluent consumer disabled, using Sarama only + return fmt.Errorf("confluent consumer not enabled") +} + +// initAvroCodec initializes the Avro codec for schema-based messages +func (c *Consumer) initAvroCodec() error { + // Use the LoadTestMessage schema (matches what producer uses) + loadTestSchema := `{ + "type": "record", + "name": "LoadTestMessage", + "namespace": "com.seaweedfs.loadtest", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "producer_id", "type": "int"}, + {"name": "counter", "type": "long"}, + {"name": "user_id", "type": "string"}, + {"name": "event_type", "type": "string"}, + {"name": "properties", "type": {"type": "map", "values": "string"}} + ] + }` + + codec, err := goavro.NewCodec(loadTestSchema) + if err != nil { + return fmt.Errorf("failed to create Avro codec: %w", err) + } + + c.avroCodec = codec + return nil +} + +// Run starts the consumer and consumes messages until the context is cancelled +func (c *Consumer) Run(ctx context.Context) { + log.Printf("Consumer %d starting for group %s", c.id, c.consumerGroup) + defer log.Printf("Consumer %d stopped", c.id) + + if c.useConfluent { + c.runConfluentConsumer(ctx) + } else { + c.runSaramaConsumer(ctx) + } +} + +// runSaramaConsumer runs the Sarama consumer group +func (c *Consumer) runSaramaConsumer(ctx context.Context) { + handler := &ConsumerGroupHandler{ + consumer: c, + } + + var wg sync.WaitGroup + + // Start error handler + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case err, ok := <-c.saramaConsumer.Errors(): + if !ok { + return + } + log.Printf("Consumer %d error: %v", c.id, err) + c.metricsCollector.RecordConsumerError() + case <-ctx.Done(): + return + } + } + }() + + // Start consumer group session + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + if err := c.saramaConsumer.Consume(ctx, c.topics, handler); err != nil { + log.Printf("Consumer %d: Error consuming: %v", c.id, err) + c.metricsCollector.RecordConsumerError() + + // Wait before retrying + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + } + } + } + }() + + // Start lag monitoring + wg.Add(1) + go func() { + defer wg.Done() + c.monitorConsumerLag(ctx) + }() + + // Wait for completion + <-ctx.Done() + log.Printf("Consumer %d: Context cancelled, shutting down", c.id) + wg.Wait() +} + +// runConfluentConsumer runs the Confluent consumer +func (c *Consumer) runConfluentConsumer(ctx context.Context) { + // Confluent consumer disabled, using Sarama only + log.Printf("Consumer %d: Confluent consumer not enabled", c.id) +} + +// processMessage processes a consumed message +func (c *Consumer) processMessage(topicPtr *string, partition int32, offset int64, key, value []byte) error { + topic := "" + if topicPtr != nil { + topic = *topicPtr + } + + // Update offset tracking + c.updateOffset(topic, partition, offset) + + // Decode message based on topic-specific schema format + var decodedMessage interface{} + var err error + + // Determine schema format for this topic (if schemas are enabled) + var schemaFormat string + if c.config.Schemas.Enabled { + schemaFormat = c.schemaFormats[topic] + if schemaFormat == "" { + // Fallback to config if topic not in map + schemaFormat = c.config.Producers.ValueType + } + } else { + // No schemas, use global value type + schemaFormat = c.config.Producers.ValueType + } + + // Decode message based on format + switch schemaFormat { + case "avro", "AVRO": + decodedMessage, err = c.decodeAvroMessage(value) + case "json", "JSON", "JSON_SCHEMA": + decodedMessage, err = c.decodeJSONSchemaMessage(value) + case "protobuf", "PROTOBUF": + decodedMessage, err = c.decodeProtobufMessage(value) + case "binary": + decodedMessage, err = c.decodeBinaryMessage(value) + default: + // Fallback to plain JSON + decodedMessage, err = c.decodeJSONMessage(value) + } + + if err != nil { + return fmt.Errorf("failed to decode message: %w", err) + } + + // Note: Removed artificial delay to allow maximum throughput + // If you need to simulate processing time, add a configurable delay setting + // time.Sleep(time.Millisecond) // Minimal processing delay + + // Record metrics + c.metricsCollector.RecordConsumedMessage(len(value)) + c.messagesProcessed++ + + // Log progress + if c.id == 0 && c.messagesProcessed%1000 == 0 { + log.Printf("Consumer %d: Processed %d messages (latest: %s[%d]@%d)", + c.id, c.messagesProcessed, topic, partition, offset) + } + + // Optional: Validate message content (for testing purposes) + if c.config.Chaos.Enabled { + if err := c.validateMessage(decodedMessage); err != nil { + log.Printf("Consumer %d: Message validation failed: %v", c.id, err) + } + } + + return nil +} + +// decodeJSONMessage decodes a JSON message +func (c *Consumer) decodeJSONMessage(value []byte) (interface{}, error) { + var message map[string]interface{} + if err := json.Unmarshal(value, &message); err != nil { + // DEBUG: Log the raw bytes when JSON parsing fails + log.Printf("Consumer %d: JSON decode failed. Length: %d, Raw bytes (hex): %x, Raw string: %q, Error: %v", + c.id, len(value), value, string(value), err) + return nil, err + } + return message, nil +} + +// decodeAvroMessage decodes an Avro message (handles Confluent Wire Format) +func (c *Consumer) decodeAvroMessage(value []byte) (interface{}, error) { + if c.avroCodec == nil { + return nil, fmt.Errorf("Avro codec not initialized") + } + + // Handle Confluent Wire Format when schemas are enabled + var avroData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract Avro data (bytes 5+) + avroData = value[5:] + } else { + // No wire format, use raw data + avroData = value + } + + native, _, err := c.avroCodec.NativeFromBinary(avroData) + if err != nil { + return nil, fmt.Errorf("failed to decode Avro data: %w", err) + } + + return native, nil +} + +// decodeJSONSchemaMessage decodes a JSON Schema message (handles Confluent Wire Format) +func (c *Consumer) decodeJSONSchemaMessage(value []byte) (interface{}, error) { + // Handle Confluent Wire Format when schemas are enabled + var jsonData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract JSON data (bytes 5+) + jsonData = value[5:] + } else { + // No wire format, use raw data + jsonData = value + } + + // Decode JSON + var message map[string]interface{} + if err := json.Unmarshal(jsonData, &message); err != nil { + return nil, fmt.Errorf("failed to decode JSON data: %w", err) + } + + return message, nil +} + +// decodeProtobufMessage decodes a Protobuf message (handles Confluent Wire Format) +func (c *Consumer) decodeProtobufMessage(value []byte) (interface{}, error) { + // Handle Confluent Wire Format when schemas are enabled + var protoData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract Protobuf data (bytes 5+) + protoData = value[5:] + } else { + // No wire format, use raw data + protoData = value + } + + // Unmarshal protobuf message + var protoMsg pb.LoadTestMessage + if err := proto.Unmarshal(protoData, &protoMsg); err != nil { + return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err) + } + + // Convert to map for consistency with other decoders + return map[string]interface{}{ + "id": protoMsg.Id, + "timestamp": protoMsg.Timestamp, + "producer_id": protoMsg.ProducerId, + "counter": protoMsg.Counter, + "user_id": protoMsg.UserId, + "event_type": protoMsg.EventType, + "properties": protoMsg.Properties, + }, nil +} + +// decodeBinaryMessage decodes a binary message +func (c *Consumer) decodeBinaryMessage(value []byte) (interface{}, error) { + if len(value) < 20 { + return nil, fmt.Errorf("binary message too short") + } + + // Extract fields from the binary format: + // [producer_id:4][counter:8][timestamp:8][random_data:...] + + producerID := int(value[0])<<24 | int(value[1])<<16 | int(value[2])<<8 | int(value[3]) + + var counter int64 + for i := 0; i < 8; i++ { + counter |= int64(value[4+i]) << (56 - i*8) + } + + var timestamp int64 + for i := 0; i < 8; i++ { + timestamp |= int64(value[12+i]) << (56 - i*8) + } + + return map[string]interface{}{ + "producer_id": producerID, + "counter": counter, + "timestamp": timestamp, + "data_size": len(value), + }, nil +} + +// validateMessage performs basic message validation +func (c *Consumer) validateMessage(message interface{}) error { + // This is a placeholder for message validation logic + // In a real load test, you might validate: + // - Message structure + // - Required fields + // - Data consistency + // - Schema compliance + + if message == nil { + return fmt.Errorf("message is nil") + } + + return nil +} + +// updateOffset updates the last seen offset for lag calculation +func (c *Consumer) updateOffset(topic string, partition int32, offset int64) { + c.offsetMutex.Lock() + defer c.offsetMutex.Unlock() + + if c.lastOffset[topic] == nil { + c.lastOffset[topic] = make(map[int32]int64) + } + c.lastOffset[topic][partition] = offset +} + +// monitorConsumerLag monitors and reports consumer lag +func (c *Consumer) monitorConsumerLag(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.reportConsumerLag() + } + } +} + +// reportConsumerLag calculates and reports consumer lag +func (c *Consumer) reportConsumerLag() { + // This is a simplified lag calculation + // In a real implementation, you would query the broker for high water marks + + c.offsetMutex.RLock() + defer c.offsetMutex.RUnlock() + + for topic, partitions := range c.lastOffset { + for partition, _ := range partitions { + // For simplicity, assume lag is always 0 when we're consuming actively + // In a real test, you would compare against the high water mark + lag := int64(0) + + c.metricsCollector.UpdateConsumerLag(c.consumerGroup, topic, partition, lag) + } + } +} + +// Close closes the consumer and cleans up resources +func (c *Consumer) Close() error { + log.Printf("Consumer %d: Closing", c.id) + + if c.saramaConsumer != nil { + return c.saramaConsumer.Close() + } + + return nil +} + +// ConsumerGroupHandler implements sarama.ConsumerGroupHandler +type ConsumerGroupHandler struct { + consumer *Consumer +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session setup", h.consumer.id) + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id) + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages() +func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + msgCount := 0 + for { + select { + case message, ok := <-claim.Messages(): + if !ok { + return nil + } + msgCount++ + + // Process the message + var key []byte + if message.Key != nil { + key = message.Key + } + + if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil { + log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err) + h.consumer.metricsCollector.RecordConsumerError() + + // Add a small delay for schema validation or other processing errors to avoid overloading + // select { + // case <-time.After(100 * time.Millisecond): + // // Continue after brief delay + // case <-session.Context().Done(): + // return nil + // } + } else { + // Mark message as processed + session.MarkMessage(message, "") + } + + case <-session.Context().Done(): + log.Printf("Consumer %d: Session context cancelled for %s[%d]", + h.consumer.id, claim.Topic(), claim.Partition()) + return nil + } + } +} + +// Helper functions + +func joinStrings(strs []string, sep string) string { + if len(strs) == 0 { + return "" + } + + result := strs[0] + for i := 1; i < len(strs); i++ { + result += sep + strs[i] + } + return result +} diff --git a/test/kafka/kafka-client-loadtest/internal/metrics/collector.go b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go new file mode 100644 index 000000000..d6a1edb8e --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go @@ -0,0 +1,353 @@ +package metrics + +import ( + "fmt" + "io" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Collector handles metrics collection for the load test +type Collector struct { + // Atomic counters for thread-safe operations + messagesProduced int64 + messagesConsumed int64 + bytesProduced int64 + bytesConsumed int64 + producerErrors int64 + consumerErrors int64 + + // Latency tracking + latencies []time.Duration + latencyMutex sync.RWMutex + + // Consumer lag tracking + consumerLag map[string]int64 + consumerLagMutex sync.RWMutex + + // Test timing + startTime time.Time + + // Prometheus metrics + prometheusMetrics *PrometheusMetrics +} + +// PrometheusMetrics holds all Prometheus metric definitions +type PrometheusMetrics struct { + MessagesProducedTotal prometheus.Counter + MessagesConsumedTotal prometheus.Counter + BytesProducedTotal prometheus.Counter + BytesConsumedTotal prometheus.Counter + ProducerErrorsTotal prometheus.Counter + ConsumerErrorsTotal prometheus.Counter + + MessageLatencyHistogram prometheus.Histogram + ProducerThroughput prometheus.Gauge + ConsumerThroughput prometheus.Gauge + ConsumerLagGauge *prometheus.GaugeVec + + ActiveProducers prometheus.Gauge + ActiveConsumers prometheus.Gauge +} + +// NewCollector creates a new metrics collector +func NewCollector() *Collector { + return &Collector{ + startTime: time.Now(), + consumerLag: make(map[string]int64), + prometheusMetrics: &PrometheusMetrics{ + MessagesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_messages_produced_total", + Help: "Total number of messages produced", + }), + MessagesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_messages_consumed_total", + Help: "Total number of messages consumed", + }), + BytesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_bytes_produced_total", + Help: "Total bytes produced", + }), + BytesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_bytes_consumed_total", + Help: "Total bytes consumed", + }), + ProducerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_producer_errors_total", + Help: "Total number of producer errors", + }), + ConsumerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_consumer_errors_total", + Help: "Total number of consumer errors", + }), + MessageLatencyHistogram: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_loadtest_message_latency_seconds", + Help: "Message end-to-end latency in seconds", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1ms to ~32s + }), + ProducerThroughput: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_producer_throughput_msgs_per_sec", + Help: "Current producer throughput in messages per second", + }), + ConsumerThroughput: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_consumer_throughput_msgs_per_sec", + Help: "Current consumer throughput in messages per second", + }), + ConsumerLagGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kafka_loadtest_consumer_lag_messages", + Help: "Consumer lag in messages", + }, []string{"consumer_group", "topic", "partition"}), + ActiveProducers: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_active_producers", + Help: "Number of active producers", + }), + ActiveConsumers: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_active_consumers", + Help: "Number of active consumers", + }), + }, + } +} + +// RecordProducedMessage records a successfully produced message +func (c *Collector) RecordProducedMessage(size int, latency time.Duration) { + atomic.AddInt64(&c.messagesProduced, 1) + atomic.AddInt64(&c.bytesProduced, int64(size)) + + c.prometheusMetrics.MessagesProducedTotal.Inc() + c.prometheusMetrics.BytesProducedTotal.Add(float64(size)) + c.prometheusMetrics.MessageLatencyHistogram.Observe(latency.Seconds()) + + // Store latency for percentile calculations + c.latencyMutex.Lock() + c.latencies = append(c.latencies, latency) + // Keep only recent latencies to avoid memory bloat + if len(c.latencies) > 100000 { + c.latencies = c.latencies[50000:] + } + c.latencyMutex.Unlock() +} + +// RecordConsumedMessage records a successfully consumed message +func (c *Collector) RecordConsumedMessage(size int) { + atomic.AddInt64(&c.messagesConsumed, 1) + atomic.AddInt64(&c.bytesConsumed, int64(size)) + + c.prometheusMetrics.MessagesConsumedTotal.Inc() + c.prometheusMetrics.BytesConsumedTotal.Add(float64(size)) +} + +// RecordProducerError records a producer error +func (c *Collector) RecordProducerError() { + atomic.AddInt64(&c.producerErrors, 1) + c.prometheusMetrics.ProducerErrorsTotal.Inc() +} + +// RecordConsumerError records a consumer error +func (c *Collector) RecordConsumerError() { + atomic.AddInt64(&c.consumerErrors, 1) + c.prometheusMetrics.ConsumerErrorsTotal.Inc() +} + +// UpdateConsumerLag updates consumer lag metrics +func (c *Collector) UpdateConsumerLag(consumerGroup, topic string, partition int32, lag int64) { + key := fmt.Sprintf("%s-%s-%d", consumerGroup, topic, partition) + + c.consumerLagMutex.Lock() + c.consumerLag[key] = lag + c.consumerLagMutex.Unlock() + + c.prometheusMetrics.ConsumerLagGauge.WithLabelValues( + consumerGroup, topic, fmt.Sprintf("%d", partition), + ).Set(float64(lag)) +} + +// UpdateThroughput updates throughput gauges +func (c *Collector) UpdateThroughput(producerRate, consumerRate float64) { + c.prometheusMetrics.ProducerThroughput.Set(producerRate) + c.prometheusMetrics.ConsumerThroughput.Set(consumerRate) +} + +// UpdateActiveClients updates active client counts +func (c *Collector) UpdateActiveClients(producers, consumers int) { + c.prometheusMetrics.ActiveProducers.Set(float64(producers)) + c.prometheusMetrics.ActiveConsumers.Set(float64(consumers)) +} + +// GetStats returns current statistics +func (c *Collector) GetStats() Stats { + produced := atomic.LoadInt64(&c.messagesProduced) + consumed := atomic.LoadInt64(&c.messagesConsumed) + bytesProduced := atomic.LoadInt64(&c.bytesProduced) + bytesConsumed := atomic.LoadInt64(&c.bytesConsumed) + producerErrors := atomic.LoadInt64(&c.producerErrors) + consumerErrors := atomic.LoadInt64(&c.consumerErrors) + + duration := time.Since(c.startTime) + + // Calculate throughput + producerThroughput := float64(produced) / duration.Seconds() + consumerThroughput := float64(consumed) / duration.Seconds() + + // Calculate latency percentiles + var latencyPercentiles map[float64]time.Duration + c.latencyMutex.RLock() + if len(c.latencies) > 0 { + latencyPercentiles = c.calculatePercentiles(c.latencies) + } + c.latencyMutex.RUnlock() + + // Get consumer lag summary + c.consumerLagMutex.RLock() + totalLag := int64(0) + maxLag := int64(0) + for _, lag := range c.consumerLag { + totalLag += lag + if lag > maxLag { + maxLag = lag + } + } + avgLag := float64(0) + if len(c.consumerLag) > 0 { + avgLag = float64(totalLag) / float64(len(c.consumerLag)) + } + c.consumerLagMutex.RUnlock() + + return Stats{ + Duration: duration, + MessagesProduced: produced, + MessagesConsumed: consumed, + BytesProduced: bytesProduced, + BytesConsumed: bytesConsumed, + ProducerErrors: producerErrors, + ConsumerErrors: consumerErrors, + ProducerThroughput: producerThroughput, + ConsumerThroughput: consumerThroughput, + LatencyPercentiles: latencyPercentiles, + TotalConsumerLag: totalLag, + MaxConsumerLag: maxLag, + AvgConsumerLag: avgLag, + } +} + +// PrintSummary prints a summary of the test statistics +func (c *Collector) PrintSummary() { + stats := c.GetStats() + + fmt.Printf("\n=== Load Test Summary ===\n") + fmt.Printf("Test Duration: %v\n", stats.Duration) + fmt.Printf("\nMessages:\n") + fmt.Printf(" Produced: %d (%.2f MB)\n", stats.MessagesProduced, float64(stats.BytesProduced)/1024/1024) + fmt.Printf(" Consumed: %d (%.2f MB)\n", stats.MessagesConsumed, float64(stats.BytesConsumed)/1024/1024) + fmt.Printf(" Producer Errors: %d\n", stats.ProducerErrors) + fmt.Printf(" Consumer Errors: %d\n", stats.ConsumerErrors) + + fmt.Printf("\nThroughput:\n") + fmt.Printf(" Producer: %.2f msgs/sec\n", stats.ProducerThroughput) + fmt.Printf(" Consumer: %.2f msgs/sec\n", stats.ConsumerThroughput) + + if stats.LatencyPercentiles != nil { + fmt.Printf("\nLatency Percentiles:\n") + percentiles := []float64{50, 90, 95, 99, 99.9} + for _, p := range percentiles { + if latency, exists := stats.LatencyPercentiles[p]; exists { + fmt.Printf(" p%.1f: %v\n", p, latency) + } + } + } + + fmt.Printf("\nConsumer Lag:\n") + fmt.Printf(" Total: %d messages\n", stats.TotalConsumerLag) + fmt.Printf(" Max: %d messages\n", stats.MaxConsumerLag) + fmt.Printf(" Average: %.2f messages\n", stats.AvgConsumerLag) + fmt.Printf("=========================\n") +} + +// WriteStats writes statistics to a writer (for HTTP endpoint) +func (c *Collector) WriteStats(w io.Writer) { + stats := c.GetStats() + + fmt.Fprintf(w, "# Load Test Statistics\n") + fmt.Fprintf(w, "duration_seconds %v\n", stats.Duration.Seconds()) + fmt.Fprintf(w, "messages_produced %d\n", stats.MessagesProduced) + fmt.Fprintf(w, "messages_consumed %d\n", stats.MessagesConsumed) + fmt.Fprintf(w, "bytes_produced %d\n", stats.BytesProduced) + fmt.Fprintf(w, "bytes_consumed %d\n", stats.BytesConsumed) + fmt.Fprintf(w, "producer_errors %d\n", stats.ProducerErrors) + fmt.Fprintf(w, "consumer_errors %d\n", stats.ConsumerErrors) + fmt.Fprintf(w, "producer_throughput_msgs_per_sec %f\n", stats.ProducerThroughput) + fmt.Fprintf(w, "consumer_throughput_msgs_per_sec %f\n", stats.ConsumerThroughput) + fmt.Fprintf(w, "total_consumer_lag %d\n", stats.TotalConsumerLag) + fmt.Fprintf(w, "max_consumer_lag %d\n", stats.MaxConsumerLag) + fmt.Fprintf(w, "avg_consumer_lag %f\n", stats.AvgConsumerLag) + + if stats.LatencyPercentiles != nil { + for percentile, latency := range stats.LatencyPercentiles { + fmt.Fprintf(w, "latency_p%g_seconds %f\n", percentile, latency.Seconds()) + } + } +} + +// calculatePercentiles calculates latency percentiles +func (c *Collector) calculatePercentiles(latencies []time.Duration) map[float64]time.Duration { + if len(latencies) == 0 { + return nil + } + + // Make a copy and sort + sorted := make([]time.Duration, len(latencies)) + copy(sorted, latencies) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i] < sorted[j] + }) + + percentiles := map[float64]time.Duration{ + 50: calculatePercentile(sorted, 50), + 90: calculatePercentile(sorted, 90), + 95: calculatePercentile(sorted, 95), + 99: calculatePercentile(sorted, 99), + 99.9: calculatePercentile(sorted, 99.9), + } + + return percentiles +} + +// calculatePercentile calculates a specific percentile from sorted data +func calculatePercentile(sorted []time.Duration, percentile float64) time.Duration { + if len(sorted) == 0 { + return 0 + } + + index := percentile / 100.0 * float64(len(sorted)-1) + if index == float64(int(index)) { + return sorted[int(index)] + } + + lower := sorted[int(index)] + upper := sorted[int(index)+1] + weight := index - float64(int(index)) + + return time.Duration(float64(lower) + weight*float64(upper-lower)) +} + +// Stats represents the current test statistics +type Stats struct { + Duration time.Duration + MessagesProduced int64 + MessagesConsumed int64 + BytesProduced int64 + BytesConsumed int64 + ProducerErrors int64 + ConsumerErrors int64 + ProducerThroughput float64 + ConsumerThroughput float64 + LatencyPercentiles map[float64]time.Duration + TotalConsumerLag int64 + MaxConsumerLag int64 + AvgConsumerLag float64 +} diff --git a/test/kafka/kafka-client-loadtest/internal/producer/producer.go b/test/kafka/kafka-client-loadtest/internal/producer/producer.go new file mode 100644 index 000000000..167bfeac6 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/producer/producer.go @@ -0,0 +1,770 @@ +package producer + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "math/rand" + "net/http" + "strings" + "sync" + "time" + + "github.com/IBM/sarama" + "github.com/linkedin/goavro/v2" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema" + pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "google.golang.org/protobuf/proto" +) + +// ErrCircuitBreakerOpen indicates that the circuit breaker is open due to consecutive failures +var ErrCircuitBreakerOpen = errors.New("circuit breaker is open") + +// Producer represents a Kafka producer for load testing +type Producer struct { + id int + config *config.Config + metricsCollector *metrics.Collector + saramaProducer sarama.SyncProducer + useConfluent bool + topics []string + avroCodec *goavro.Codec + startTime time.Time // Test run start time for generating unique keys + + // Schema management + schemaIDs map[string]int // topic -> schema ID mapping + schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, etc.) + + // Rate limiting + rateLimiter *time.Ticker + + // Message generation + messageCounter int64 + random *rand.Rand + + // Circuit breaker detection + consecutiveFailures int +} + +// Message represents a test message +type Message struct { + ID string `json:"id"` + Timestamp int64 `json:"timestamp"` + ProducerID int `json:"producer_id"` + Counter int64 `json:"counter"` + UserID string `json:"user_id"` + EventType string `json:"event_type"` + Properties map[string]interface{} `json:"properties"` +} + +// New creates a new producer instance +func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, error) { + p := &Producer{ + id: id, + config: cfg, + metricsCollector: collector, + topics: cfg.GetTopicNames(), + random: rand.New(rand.NewSource(time.Now().UnixNano() + int64(id))), + useConfluent: false, // Use Sarama by default, can be made configurable + schemaIDs: make(map[string]int), + schemaFormats: make(map[string]string), + startTime: time.Now(), // Record test start time for unique key generation + } + + // Initialize schema formats for each topic + // Distribute across AVRO, JSON, and PROTOBUF formats + for i, topic := range p.topics { + var schemaFormat string + if cfg.Producers.SchemaFormat != "" { + // Use explicit config if provided + schemaFormat = cfg.Producers.SchemaFormat + } else { + // Distribute across three formats: AVRO, JSON, PROTOBUF + switch i % 3 { + case 0: + schemaFormat = "AVRO" + case 1: + schemaFormat = "JSON" + case 2: + schemaFormat = "PROTOBUF" + } + } + p.schemaFormats[topic] = schemaFormat + log.Printf("Producer %d: Topic %s will use schema format: %s", id, topic, schemaFormat) + } + + // Set up rate limiter if specified + if cfg.Producers.MessageRate > 0 { + p.rateLimiter = time.NewTicker(time.Second / time.Duration(cfg.Producers.MessageRate)) + } + + // Initialize Sarama producer + if err := p.initSaramaProducer(); err != nil { + return nil, fmt.Errorf("failed to initialize Sarama producer: %w", err) + } + + // Initialize Avro codec and register/fetch schemas if schemas are enabled + if cfg.Schemas.Enabled { + if err := p.initAvroCodec(); err != nil { + return nil, fmt.Errorf("failed to initialize Avro codec: %w", err) + } + if err := p.ensureSchemasRegistered(); err != nil { + return nil, fmt.Errorf("failed to ensure schemas are registered: %w", err) + } + if err := p.fetchSchemaIDs(); err != nil { + return nil, fmt.Errorf("failed to fetch schema IDs: %w", err) + } + } + + log.Printf("Producer %d initialized successfully", id) + return p, nil +} + +// initSaramaProducer initializes the Sarama producer +func (p *Producer) initSaramaProducer() error { + config := sarama.NewConfig() + + // Producer configuration + config.Producer.RequiredAcks = sarama.WaitForAll + if p.config.Producers.Acks == "0" { + config.Producer.RequiredAcks = sarama.NoResponse + } else if p.config.Producers.Acks == "1" { + config.Producer.RequiredAcks = sarama.WaitForLocal + } + + config.Producer.Retry.Max = p.config.Producers.Retries + config.Producer.Retry.Backoff = time.Duration(p.config.Producers.RetryBackoffMs) * time.Millisecond + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + + // Compression + switch p.config.Producers.CompressionType { + case "gzip": + config.Producer.Compression = sarama.CompressionGZIP + case "snappy": + config.Producer.Compression = sarama.CompressionSnappy + case "lz4": + config.Producer.Compression = sarama.CompressionLZ4 + case "zstd": + config.Producer.Compression = sarama.CompressionZSTD + default: + config.Producer.Compression = sarama.CompressionNone + } + + // Batching + config.Producer.Flush.Messages = p.config.Producers.BatchSize + config.Producer.Flush.Frequency = time.Duration(p.config.Producers.LingerMs) * time.Millisecond + + // Timeouts + config.Net.DialTimeout = 30 * time.Second + config.Net.ReadTimeout = 30 * time.Second + config.Net.WriteTimeout = 30 * time.Second + + // Version + config.Version = sarama.V2_8_0_0 + + // Create producer + producer, err := sarama.NewSyncProducer(p.config.Kafka.BootstrapServers, config) + if err != nil { + return fmt.Errorf("failed to create Sarama producer: %w", err) + } + + p.saramaProducer = producer + return nil +} + +// initAvroCodec initializes the Avro codec for schema-based messages +func (p *Producer) initAvroCodec() error { + // Use the shared LoadTestMessage schema + codec, err := goavro.NewCodec(schema.GetAvroSchema()) + if err != nil { + return fmt.Errorf("failed to create Avro codec: %w", err) + } + + p.avroCodec = codec + return nil +} + +// Run starts the producer and produces messages until the context is cancelled +func (p *Producer) Run(ctx context.Context) error { + log.Printf("Producer %d starting", p.id) + defer log.Printf("Producer %d stopped", p.id) + + // Create topics if they don't exist + if err := p.createTopics(); err != nil { + log.Printf("Producer %d: Failed to create topics: %v", p.id, err) + p.metricsCollector.RecordProducerError() + return err + } + + var wg sync.WaitGroup + errChan := make(chan error, 1) + + // Main production loop + wg.Add(1) + go func() { + defer wg.Done() + if err := p.produceMessages(ctx); err != nil { + errChan <- err + } + }() + + // Wait for completion or error + select { + case <-ctx.Done(): + log.Printf("Producer %d: Context cancelled, shutting down", p.id) + case err := <-errChan: + log.Printf("Producer %d: Stopping due to error: %v", p.id, err) + return err + } + + // Stop rate limiter + if p.rateLimiter != nil { + p.rateLimiter.Stop() + } + + // Wait for goroutines to finish + wg.Wait() + return nil +} + +// produceMessages is the main message production loop +func (p *Producer) produceMessages(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + default: + // Rate limiting + if p.rateLimiter != nil { + select { + case <-p.rateLimiter.C: + // Proceed + case <-ctx.Done(): + return nil + } + } + + if err := p.produceMessage(); err != nil { + log.Printf("Producer %d: Failed to produce message: %v", p.id, err) + p.metricsCollector.RecordProducerError() + + // Check for circuit breaker error + if p.isCircuitBreakerError(err) { + p.consecutiveFailures++ + log.Printf("Producer %d: Circuit breaker error detected (%d/%d consecutive failures)", + p.id, p.consecutiveFailures, 3) + + // Progressive backoff delay to avoid overloading the gateway + backoffDelay := time.Duration(p.consecutiveFailures) * 500 * time.Millisecond + log.Printf("Producer %d: Backing off for %v to avoid overloading gateway", p.id, backoffDelay) + + select { + case <-time.After(backoffDelay): + // Continue after delay + case <-ctx.Done(): + return nil + } + + // If we've hit 3 consecutive circuit breaker errors, stop the producer + if p.consecutiveFailures >= 3 { + log.Printf("Producer %d: Circuit breaker is open - stopping producer after %d consecutive failures", + p.id, p.consecutiveFailures) + return fmt.Errorf("%w: stopping producer after %d consecutive failures", ErrCircuitBreakerOpen, p.consecutiveFailures) + } + } else { + // Reset counter for non-circuit breaker errors + p.consecutiveFailures = 0 + } + } else { + // Reset counter on successful message + p.consecutiveFailures = 0 + } + } + } +} + +// produceMessage produces a single message +func (p *Producer) produceMessage() error { + startTime := time.Now() + + // Select random topic + topic := p.topics[p.random.Intn(len(p.topics))] + + // Produce message using Sarama (message will be generated based on topic's schema format) + return p.produceSaramaMessage(topic, startTime) +} + +// produceSaramaMessage produces a message using Sarama +// The message is generated internally based on the topic's schema format +func (p *Producer) produceSaramaMessage(topic string, startTime time.Time) error { + // Generate key + key := p.generateMessageKey() + + // If schemas are enabled, wrap in Confluent Wire Format based on topic's schema format + var messageValue []byte + if p.config.Schemas.Enabled { + schemaID, exists := p.schemaIDs[topic] + if !exists { + return fmt.Errorf("schema ID not found for topic %s", topic) + } + + // Get the schema format for this topic + schemaFormat := p.schemaFormats[topic] + + // CRITICAL FIX: Encode based on schema format, NOT config value_type + // The encoding MUST match what the schema registry and gateway expect + var encodedMessage []byte + var err error + switch schemaFormat { + case "AVRO": + // For Avro schema, encode as Avro binary + encodedMessage, err = p.generateAvroMessage() + if err != nil { + return fmt.Errorf("failed to encode as Avro for topic %s: %w", topic, err) + } + case "JSON": + // For JSON schema, encode as JSON + encodedMessage, err = p.generateJSONMessage() + if err != nil { + return fmt.Errorf("failed to encode as JSON for topic %s: %w", topic, err) + } + case "PROTOBUF": + // For PROTOBUF schema, encode as Protobuf binary + encodedMessage, err = p.generateProtobufMessage() + if err != nil { + return fmt.Errorf("failed to encode as Protobuf for topic %s: %w", topic, err) + } + default: + // Unknown format - fallback to JSON + encodedMessage, err = p.generateJSONMessage() + if err != nil { + return fmt.Errorf("failed to encode as JSON (unknown format fallback) for topic %s: %w", topic, err) + } + } + + // Wrap in Confluent wire format (magic byte + schema ID + payload) + messageValue = p.createConfluentWireFormat(schemaID, encodedMessage) + } else { + // No schemas - generate message based on config value_type + var err error + messageValue, err = p.generateMessage() + if err != nil { + return fmt.Errorf("failed to generate message: %w", err) + } + } + + msg := &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(messageValue), + } + + // Add headers if configured + if p.config.Producers.IncludeHeaders { + msg.Headers = []sarama.RecordHeader{ + {Key: []byte("producer_id"), Value: []byte(fmt.Sprintf("%d", p.id))}, + {Key: []byte("timestamp"), Value: []byte(fmt.Sprintf("%d", startTime.UnixNano()))}, + } + } + + // Produce message + _, _, err := p.saramaProducer.SendMessage(msg) + if err != nil { + return err + } + + // Record metrics + latency := time.Since(startTime) + p.metricsCollector.RecordProducedMessage(len(messageValue), latency) + + return nil +} + +// generateMessage generates a test message +func (p *Producer) generateMessage() ([]byte, error) { + p.messageCounter++ + + switch p.config.Producers.ValueType { + case "avro": + return p.generateAvroMessage() + case "json": + return p.generateJSONMessage() + case "binary": + return p.generateBinaryMessage() + default: + return p.generateJSONMessage() + } +} + +// generateJSONMessage generates a JSON test message +func (p *Producer) generateJSONMessage() ([]byte, error) { + msg := Message{ + ID: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter), + Timestamp: time.Now().UnixNano(), + ProducerID: p.id, + Counter: p.messageCounter, + UserID: fmt.Sprintf("user-%d", p.random.Intn(10000)), + EventType: p.randomEventType(), + Properties: map[string]interface{}{ + "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)), + "page_views": fmt.Sprintf("%d", p.random.Intn(100)), // String for Avro map<string,string> + "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), // String for Avro map<string,string> + "country": p.randomCountry(), + "device_type": p.randomDeviceType(), + "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)), + }, + } + + // Marshal to JSON (no padding - let natural message size be used) + messageBytes, err := json.Marshal(msg) + if err != nil { + return nil, err + } + + return messageBytes, nil +} + +// generateProtobufMessage generates a Protobuf-encoded message +func (p *Producer) generateProtobufMessage() ([]byte, error) { + // Create protobuf message + protoMsg := &pb.LoadTestMessage{ + Id: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter), + Timestamp: time.Now().UnixNano(), + ProducerId: int32(p.id), + Counter: p.messageCounter, + UserId: fmt.Sprintf("user-%d", p.random.Intn(10000)), + EventType: p.randomEventType(), + Properties: map[string]string{ + "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)), + "page_views": fmt.Sprintf("%d", p.random.Intn(100)), + "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), + "country": p.randomCountry(), + "device_type": p.randomDeviceType(), + "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)), + }, + } + + // Marshal to protobuf binary + messageBytes, err := proto.Marshal(protoMsg) + if err != nil { + return nil, err + } + + return messageBytes, nil +} + +// generateAvroMessage generates an Avro-encoded message with Confluent Wire Format +// NOTE: Avro messages are NOT padded - they have their own binary format +func (p *Producer) generateAvroMessage() ([]byte, error) { + if p.avroCodec == nil { + return nil, fmt.Errorf("Avro codec not initialized") + } + + // Create Avro-compatible record matching the LoadTestMessage schema + record := map[string]interface{}{ + "id": fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter), + "timestamp": time.Now().UnixNano(), + "producer_id": p.id, + "counter": p.messageCounter, + "user_id": fmt.Sprintf("user-%d", p.random.Intn(10000)), + "event_type": p.randomEventType(), + "properties": map[string]interface{}{ + "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)), + "page_views": fmt.Sprintf("%d", p.random.Intn(100)), + "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), + "country": p.randomCountry(), + "device_type": p.randomDeviceType(), + "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)), + }, + } + + // Encode to Avro binary + avroBytes, err := p.avroCodec.BinaryFromNative(nil, record) + if err != nil { + return nil, err + } + + return avroBytes, nil +} + +// generateBinaryMessage generates a binary test message (no padding) +func (p *Producer) generateBinaryMessage() ([]byte, error) { + // Create a simple binary message format: + // [producer_id:4][counter:8][timestamp:8] + message := make([]byte, 20) + + // Producer ID (4 bytes) + message[0] = byte(p.id >> 24) + message[1] = byte(p.id >> 16) + message[2] = byte(p.id >> 8) + message[3] = byte(p.id) + + // Counter (8 bytes) + for i := 0; i < 8; i++ { + message[4+i] = byte(p.messageCounter >> (56 - i*8)) + } + + // Timestamp (8 bytes) + timestamp := time.Now().UnixNano() + for i := 0; i < 8; i++ { + message[12+i] = byte(timestamp >> (56 - i*8)) + } + + return message, nil +} + +// generateMessageKey generates a message key based on the configured distribution +// Keys are prefixed with a test run ID to track messages across test runs +func (p *Producer) generateMessageKey() string { + // Use test start time as run ID (format: YYYYMMDD-HHMMSS) + runID := p.startTime.Format("20060102-150405") + + switch p.config.Producers.KeyDistribution { + case "sequential": + return fmt.Sprintf("run-%s-key-%d", runID, p.messageCounter) + case "uuid": + return fmt.Sprintf("run-%s-uuid-%d-%d-%d", runID, p.id, time.Now().UnixNano(), p.random.Intn(1000000)) + default: // random + return fmt.Sprintf("run-%s-key-%d", runID, p.random.Intn(10000)) + } +} + +// createTopics creates the test topics if they don't exist +func (p *Producer) createTopics() error { + // Use Sarama admin client to create topics + config := sarama.NewConfig() + config.Version = sarama.V2_8_0_0 + + admin, err := sarama.NewClusterAdmin(p.config.Kafka.BootstrapServers, config) + if err != nil { + return fmt.Errorf("failed to create admin client: %w", err) + } + defer admin.Close() + + // Create topic specifications + topicSpecs := make(map[string]*sarama.TopicDetail) + for _, topic := range p.topics { + topicSpecs[topic] = &sarama.TopicDetail{ + NumPartitions: int32(p.config.Topics.Partitions), + ReplicationFactor: int16(p.config.Topics.ReplicationFactor), + ConfigEntries: map[string]*string{ + "cleanup.policy": &p.config.Topics.CleanupPolicy, + "retention.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.RetentionMs)), + "segment.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.SegmentMs)), + }, + } + } + + // Create topics + for _, topic := range p.topics { + err = admin.CreateTopic(topic, topicSpecs[topic], false) + if err != nil && err != sarama.ErrTopicAlreadyExists { + log.Printf("Producer %d: Warning - failed to create topic %s: %v", p.id, topic, err) + } else { + log.Printf("Producer %d: Successfully created topic %s", p.id, topic) + } + } + + return nil +} + +// Close closes the producer and cleans up resources +func (p *Producer) Close() error { + log.Printf("Producer %d: Closing", p.id) + + if p.rateLimiter != nil { + p.rateLimiter.Stop() + } + + if p.saramaProducer != nil { + return p.saramaProducer.Close() + } + + return nil +} + +// Helper functions + +func stringPtr(s string) *string { + return &s +} + +func joinStrings(strs []string, sep string) string { + if len(strs) == 0 { + return "" + } + + result := strs[0] + for i := 1; i < len(strs); i++ { + result += sep + strs[i] + } + return result +} + +func (p *Producer) randomEventType() string { + events := []string{"login", "logout", "view", "click", "purchase", "signup", "search", "download"} + return events[p.random.Intn(len(events))] +} + +func (p *Producer) randomCountry() string { + countries := []string{"US", "CA", "UK", "DE", "FR", "JP", "AU", "BR", "IN", "CN"} + return countries[p.random.Intn(len(countries))] +} + +func (p *Producer) randomDeviceType() string { + devices := []string{"desktop", "mobile", "tablet", "tv", "watch"} + return devices[p.random.Intn(len(devices))] +} + +// fetchSchemaIDs fetches schema IDs from Schema Registry for all topics +func (p *Producer) fetchSchemaIDs() error { + for _, topic := range p.topics { + subject := topic + "-value" + schemaID, err := p.getSchemaID(subject) + if err != nil { + return fmt.Errorf("failed to get schema ID for subject %s: %w", subject, err) + } + p.schemaIDs[topic] = schemaID + log.Printf("Producer %d: Fetched schema ID %d for topic %s", p.id, schemaID, topic) + } + return nil +} + +// getSchemaID fetches the latest schema ID for a subject from Schema Registry +func (p *Producer) getSchemaID(subject string) (int, error) { + url := fmt.Sprintf("%s/subjects/%s/versions/latest", p.config.SchemaRegistry.URL, subject) + + resp, err := http.Get(url) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("failed to get schema: status=%d, body=%s", resp.StatusCode, string(body)) + } + + var schemaResp struct { + ID int `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil { + return 0, err + } + + return schemaResp.ID, nil +} + +// ensureSchemasRegistered ensures that schemas are registered for all topics +// It registers schemas if they don't exist, but doesn't fail if they already do +func (p *Producer) ensureSchemasRegistered() error { + for _, topic := range p.topics { + subject := topic + "-value" + + // First check if schema already exists + schemaID, err := p.getSchemaID(subject) + if err == nil { + log.Printf("Producer %d: Schema already exists for topic %s (ID: %d), skipping registration", p.id, topic, schemaID) + continue + } + + // Schema doesn't exist, register it + log.Printf("Producer %d: Registering schema for topic %s", p.id, topic) + if err := p.registerTopicSchema(subject); err != nil { + return fmt.Errorf("failed to register schema for topic %s: %w", topic, err) + } + log.Printf("Producer %d: Schema registered successfully for topic %s", p.id, topic) + } + return nil +} + +// registerTopicSchema registers the schema for a specific topic based on configured format +func (p *Producer) registerTopicSchema(subject string) error { + // Extract topic name from subject (remove -value or -key suffix) + topicName := strings.TrimSuffix(strings.TrimSuffix(subject, "-value"), "-key") + + // Get schema format for this topic + schemaFormat, ok := p.schemaFormats[topicName] + if !ok { + // Fallback to config or default + schemaFormat = p.config.Producers.SchemaFormat + if schemaFormat == "" { + schemaFormat = "AVRO" + } + } + + var schemaStr string + var schemaType string + + switch strings.ToUpper(schemaFormat) { + case "AVRO": + schemaStr = schema.GetAvroSchema() + schemaType = "AVRO" + case "JSON", "JSON_SCHEMA": + schemaStr = schema.GetJSONSchema() + schemaType = "JSON" + case "PROTOBUF": + schemaStr = schema.GetProtobufSchema() + schemaType = "PROTOBUF" + default: + return fmt.Errorf("unsupported schema format: %s", schemaFormat) + } + + url := fmt.Sprintf("%s/subjects/%s/versions", p.config.SchemaRegistry.URL, subject) + + payload := map[string]interface{}{ + "schema": schemaStr, + "schemaType": schemaType, + } + + jsonPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal schema payload: %w", err) + } + + resp, err := http.Post(url, "application/vnd.schemaregistry.v1+json", strings.NewReader(string(jsonPayload))) + if err != nil { + return fmt.Errorf("failed to register schema: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("schema registration failed: status=%d, body=%s", resp.StatusCode, string(body)) + } + + var registerResp struct { + ID int `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(®isterResp); err != nil { + return fmt.Errorf("failed to decode registration response: %w", err) + } + + log.Printf("Schema registered with ID: %d (format: %s)", registerResp.ID, schemaType) + return nil +} + +// createConfluentWireFormat creates a message in Confluent Wire Format +// This matches the implementation in weed/mq/kafka/schema/envelope.go CreateConfluentEnvelope +func (p *Producer) createConfluentWireFormat(schemaID int, avroData []byte) []byte { + // Confluent Wire Format: [magic_byte(1)][schema_id(4)][payload(n)] + // magic_byte = 0x00 + // schema_id = 4 bytes big-endian + wireFormat := make([]byte, 5+len(avroData)) + wireFormat[0] = 0x00 // Magic byte + binary.BigEndian.PutUint32(wireFormat[1:5], uint32(schemaID)) + copy(wireFormat[5:], avroData) + return wireFormat +} + +// isCircuitBreakerError checks if an error indicates that the circuit breaker is open +func (p *Producer) isCircuitBreakerError(err error) bool { + return errors.Is(err, ErrCircuitBreakerOpen) +} diff --git a/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto new file mode 100644 index 000000000..dfe00b72f --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package com.seaweedfs.loadtest; + +option go_package = "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"; + +message LoadTestMessage { + string id = 1; + int64 timestamp = 2; + int32 producer_id = 3; + int64 counter = 4; + string user_id = 5; + string event_type = 6; + map<string, string> properties = 7; +} + diff --git a/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go new file mode 100644 index 000000000..3ed58aa9e --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go @@ -0,0 +1,185 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v5.29.3 +// source: loadtest.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type LoadTestMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + ProducerId int32 `protobuf:"varint,3,opt,name=producer_id,json=producerId,proto3" json:"producer_id,omitempty"` + Counter int64 `protobuf:"varint,4,opt,name=counter,proto3" json:"counter,omitempty"` + UserId string `protobuf:"bytes,5,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + EventType string `protobuf:"bytes,6,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` + Properties map[string]string `protobuf:"bytes,7,rep,name=properties,proto3" json:"properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LoadTestMessage) Reset() { + *x = LoadTestMessage{} + mi := &file_loadtest_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LoadTestMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LoadTestMessage) ProtoMessage() {} + +func (x *LoadTestMessage) ProtoReflect() protoreflect.Message { + mi := &file_loadtest_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LoadTestMessage.ProtoReflect.Descriptor instead. +func (*LoadTestMessage) Descriptor() ([]byte, []int) { + return file_loadtest_proto_rawDescGZIP(), []int{0} +} + +func (x *LoadTestMessage) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *LoadTestMessage) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *LoadTestMessage) GetProducerId() int32 { + if x != nil { + return x.ProducerId + } + return 0 +} + +func (x *LoadTestMessage) GetCounter() int64 { + if x != nil { + return x.Counter + } + return 0 +} + +func (x *LoadTestMessage) GetUserId() string { + if x != nil { + return x.UserId + } + return "" +} + +func (x *LoadTestMessage) GetEventType() string { + if x != nil { + return x.EventType + } + return "" +} + +func (x *LoadTestMessage) GetProperties() map[string]string { + if x != nil { + return x.Properties + } + return nil +} + +var File_loadtest_proto protoreflect.FileDescriptor + +const file_loadtest_proto_rawDesc = "" + + "\n" + + "\x0eloadtest.proto\x12\x16com.seaweedfs.loadtest\"\xca\x02\n" + + "\x0fLoadTestMessage\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x1c\n" + + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12\x1f\n" + + "\vproducer_id\x18\x03 \x01(\x05R\n" + + "producerId\x12\x18\n" + + "\acounter\x18\x04 \x01(\x03R\acounter\x12\x17\n" + + "\auser_id\x18\x05 \x01(\tR\x06userId\x12\x1d\n" + + "\n" + + "event_type\x18\x06 \x01(\tR\teventType\x12W\n" + + "\n" + + "properties\x18\a \x03(\v27.com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntryR\n" + + "properties\x1a=\n" + + "\x0fPropertiesEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01BTZRgithub.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pbb\x06proto3" + +var ( + file_loadtest_proto_rawDescOnce sync.Once + file_loadtest_proto_rawDescData []byte +) + +func file_loadtest_proto_rawDescGZIP() []byte { + file_loadtest_proto_rawDescOnce.Do(func() { + file_loadtest_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc))) + }) + return file_loadtest_proto_rawDescData +} + +var file_loadtest_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_loadtest_proto_goTypes = []any{ + (*LoadTestMessage)(nil), // 0: com.seaweedfs.loadtest.LoadTestMessage + nil, // 1: com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry +} +var file_loadtest_proto_depIdxs = []int32{ + 1, // 0: com.seaweedfs.loadtest.LoadTestMessage.properties:type_name -> com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_loadtest_proto_init() } +func file_loadtest_proto_init() { + if File_loadtest_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_loadtest_proto_goTypes, + DependencyIndexes: file_loadtest_proto_depIdxs, + MessageInfos: file_loadtest_proto_msgTypes, + }.Build() + File_loadtest_proto = out.File + file_loadtest_proto_goTypes = nil + file_loadtest_proto_depIdxs = nil +} diff --git a/test/kafka/kafka-client-loadtest/internal/schema/schemas.go b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go new file mode 100644 index 000000000..011b28ef2 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go @@ -0,0 +1,58 @@ +package schema + +// GetAvroSchema returns the Avro schema for load test messages +func GetAvroSchema() string { + return `{ + "type": "record", + "name": "LoadTestMessage", + "namespace": "com.seaweedfs.loadtest", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "producer_id", "type": "int"}, + {"name": "counter", "type": "long"}, + {"name": "user_id", "type": "string"}, + {"name": "event_type", "type": "string"}, + {"name": "properties", "type": {"type": "map", "values": "string"}} + ] + }` +} + +// GetJSONSchema returns the JSON Schema for load test messages +func GetJSONSchema() string { + return `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "LoadTestMessage", + "type": "object", + "properties": { + "id": {"type": "string"}, + "timestamp": {"type": "integer"}, + "producer_id": {"type": "integer"}, + "counter": {"type": "integer"}, + "user_id": {"type": "string"}, + "event_type": {"type": "string"}, + "properties": { + "type": "object", + "additionalProperties": {"type": "string"} + } + }, + "required": ["id", "timestamp", "producer_id", "counter", "user_id", "event_type"] + }` +} + +// GetProtobufSchema returns the Protobuf schema for load test messages +func GetProtobufSchema() string { + return `syntax = "proto3"; + +package com.seaweedfs.loadtest; + +message LoadTestMessage { + string id = 1; + int64 timestamp = 2; + int32 producer_id = 3; + int64 counter = 4; + string user_id = 5; + string event_type = 6; + map<string, string> properties = 7; +}` +} diff --git a/test/kafka/kafka-client-loadtest/loadtest b/test/kafka/kafka-client-loadtest/loadtest Binary files differnew file mode 100755 index 000000000..e5a23f173 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/loadtest diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/kafka-loadtest.json b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/kafka-loadtest.json new file mode 100644 index 000000000..3ea04fb68 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/kafka-loadtest.json @@ -0,0 +1,106 @@ +{ + "dashboard": { + "id": null, + "title": "Kafka Client Load Test Dashboard", + "tags": ["kafka", "loadtest", "seaweedfs"], + "timezone": "browser", + "panels": [ + { + "id": 1, + "title": "Messages Produced/Consumed", + "type": "stat", + "targets": [ + { + "expr": "rate(kafka_loadtest_messages_produced_total[5m])", + "legendFormat": "Produced/sec" + }, + { + "expr": "rate(kafka_loadtest_messages_consumed_total[5m])", + "legendFormat": "Consumed/sec" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0} + }, + { + "id": 2, + "title": "Message Latency", + "type": "graph", + "targets": [ + { + "expr": "histogram_quantile(0.95, kafka_loadtest_message_latency_seconds)", + "legendFormat": "95th percentile" + }, + { + "expr": "histogram_quantile(0.99, kafka_loadtest_message_latency_seconds)", + "legendFormat": "99th percentile" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0} + }, + { + "id": 3, + "title": "Error Rates", + "type": "graph", + "targets": [ + { + "expr": "rate(kafka_loadtest_producer_errors_total[5m])", + "legendFormat": "Producer Errors/sec" + }, + { + "expr": "rate(kafka_loadtest_consumer_errors_total[5m])", + "legendFormat": "Consumer Errors/sec" + } + ], + "gridPos": {"h": 8, "w": 24, "x": 0, "y": 8} + }, + { + "id": 4, + "title": "Throughput (MB/s)", + "type": "graph", + "targets": [ + { + "expr": "rate(kafka_loadtest_bytes_produced_total[5m]) / 1024 / 1024", + "legendFormat": "Produced MB/s" + }, + { + "expr": "rate(kafka_loadtest_bytes_consumed_total[5m]) / 1024 / 1024", + "legendFormat": "Consumed MB/s" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 16} + }, + { + "id": 5, + "title": "Active Clients", + "type": "stat", + "targets": [ + { + "expr": "kafka_loadtest_active_producers", + "legendFormat": "Producers" + }, + { + "expr": "kafka_loadtest_active_consumers", + "legendFormat": "Consumers" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 16} + }, + { + "id": 6, + "title": "Consumer Lag", + "type": "graph", + "targets": [ + { + "expr": "kafka_loadtest_consumer_lag_messages", + "legendFormat": "{{consumer_group}}-{{topic}}-{{partition}}" + } + ], + "gridPos": {"h": 8, "w": 24, "x": 0, "y": 24} + } + ], + "time": {"from": "now-30m", "to": "now"}, + "refresh": "5s", + "schemaVersion": 16, + "version": 0 + } +} diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/seaweedfs.json b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/seaweedfs.json new file mode 100644 index 000000000..4c2261f22 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/seaweedfs.json @@ -0,0 +1,62 @@ +{ + "dashboard": { + "id": null, + "title": "SeaweedFS Cluster Dashboard", + "tags": ["seaweedfs", "storage"], + "timezone": "browser", + "panels": [ + { + "id": 1, + "title": "Master Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-master\"}", + "legendFormat": "Master Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 0, "y": 0} + }, + { + "id": 2, + "title": "Volume Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-volume\"}", + "legendFormat": "Volume Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 6, "y": 0} + }, + { + "id": 3, + "title": "Filer Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-filer\"}", + "legendFormat": "Filer Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 12, "y": 0} + }, + { + "id": 4, + "title": "MQ Broker Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-mq-broker\"}", + "legendFormat": "MQ Broker Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 18, "y": 0} + } + ], + "time": {"from": "now-30m", "to": "now"}, + "refresh": "10s", + "schemaVersion": 16, + "version": 0 + } +} diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/dashboards/dashboard.yml b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/dashboards/dashboard.yml new file mode 100644 index 000000000..0bcf3d818 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/dashboards/dashboard.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/datasources/datasource.yml b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 000000000..fb78be722 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,12 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + orgId: 1 + url: http://prometheus:9090 + basicAuth: false + isDefault: true + editable: true + version: 1 diff --git a/test/kafka/kafka-client-loadtest/monitoring/prometheus/prometheus.yml b/test/kafka/kafka-client-loadtest/monitoring/prometheus/prometheus.yml new file mode 100644 index 000000000..f62091d52 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/monitoring/prometheus/prometheus.yml @@ -0,0 +1,54 @@ +# Prometheus configuration for Kafka Load Test monitoring + +global: + scrape_interval: 15s + evaluation_interval: 15s + +rule_files: + # - "first_rules.yml" + # - "second_rules.yml" + +scrape_configs: + # Scrape Prometheus itself + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + # Scrape load test metrics + - job_name: 'kafka-loadtest' + static_configs: + - targets: ['kafka-client-loadtest-runner:8080'] + scrape_interval: 5s + metrics_path: '/metrics' + + # Scrape SeaweedFS Master metrics + - job_name: 'seaweedfs-master' + static_configs: + - targets: ['seaweedfs-master:9333'] + metrics_path: '/metrics' + + # Scrape SeaweedFS Volume metrics + - job_name: 'seaweedfs-volume' + static_configs: + - targets: ['seaweedfs-volume:8080'] + metrics_path: '/metrics' + + # Scrape SeaweedFS Filer metrics + - job_name: 'seaweedfs-filer' + static_configs: + - targets: ['seaweedfs-filer:8888'] + metrics_path: '/metrics' + + # Scrape SeaweedFS MQ Broker metrics (if available) + - job_name: 'seaweedfs-mq-broker' + static_configs: + - targets: ['seaweedfs-mq-broker:17777'] + metrics_path: '/metrics' + scrape_interval: 10s + + # Scrape Kafka Gateway metrics (if available) + - job_name: 'kafka-gateway' + static_configs: + - targets: ['kafka-gateway:9093'] + metrics_path: '/metrics' + scrape_interval: 10s diff --git a/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh b/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh new file mode 100755 index 000000000..58cb0f114 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh @@ -0,0 +1,423 @@ +#!/bin/bash + +# Register schemas with Schema Registry for load testing +# This script registers the necessary schemas before running load tests + +set -euo pipefail + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +log_info() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +log_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +log_warning() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Configuration +SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"} +TIMEOUT=${TIMEOUT:-60} +CHECK_INTERVAL=${CHECK_INTERVAL:-2} + +# Wait for Schema Registry to be ready +wait_for_schema_registry() { + log_info "Waiting for Schema Registry to be ready..." + + local elapsed=0 + while [[ $elapsed -lt $TIMEOUT ]]; do + if curl -sf --max-time 5 "$SCHEMA_REGISTRY_URL/subjects" >/dev/null 2>&1; then + log_success "Schema Registry is ready!" + return 0 + fi + + log_info "Schema Registry not ready yet. Waiting ${CHECK_INTERVAL}s... (${elapsed}/${TIMEOUT}s)" + sleep $CHECK_INTERVAL + elapsed=$((elapsed + CHECK_INTERVAL)) + done + + log_error "Schema Registry did not become ready within ${TIMEOUT} seconds" + return 1 +} + +# Register a schema for a subject +register_schema() { + local subject=$1 + local schema=$2 + local schema_type=${3:-"AVRO"} + local max_attempts=5 + local attempt=1 + + log_info "Registering schema for subject: $subject" + + # Create the schema registration payload + local escaped_schema=$(echo "$schema" | jq -Rs .) + local payload=$(cat <<EOF +{ + "schema": $escaped_schema, + "schemaType": "$schema_type" +} +EOF +) + + while [[ $attempt -le $max_attempts ]]; do + # Register the schema (with 30 second timeout) + local response + response=$(curl -s --max-time 30 -X POST \ + -H "Content-Type: application/vnd.schemaregistry.v1+json" \ + -d "$payload" \ + "$SCHEMA_REGISTRY_URL/subjects/$subject/versions" 2>/dev/null) + + if echo "$response" | jq -e '.id' >/dev/null 2>&1; then + local schema_id + schema_id=$(echo "$response" | jq -r '.id') + if [[ $attempt -gt 1 ]]; then + log_success "- Schema registered for $subject with ID: $schema_id [attempt $attempt]" + else + log_success "- Schema registered for $subject with ID: $schema_id" + fi + return 0 + fi + + # Check if it's a consumer lag timeout (error_code 50002) + local error_code + error_code=$(echo "$response" | jq -r '.error_code // empty' 2>/dev/null) + + if [[ "$error_code" == "50002" && $attempt -lt $max_attempts ]]; then + # Consumer lag timeout - wait longer for consumer to catch up + # Use exponential backoff: 1s, 2s, 4s, 8s + local wait_time=$(echo "2 ^ ($attempt - 1)" | bc) + log_warning "Schema Registry consumer lag detected for $subject, waiting ${wait_time}s before retry (attempt $attempt)..." + sleep "$wait_time" + attempt=$((attempt + 1)) + else + # Other error or max attempts reached + log_error "x Failed to register schema for $subject" + log_error "Response: $response" + return 1 + fi + done + + return 1 +} + +# Verify a schema exists (single attempt) +verify_schema() { + local subject=$1 + + local response + response=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects/$subject/versions/latest" 2>/dev/null) + + if echo "$response" | jq -e '.id' >/dev/null 2>&1; then + local schema_id + local version + schema_id=$(echo "$response" | jq -r '.id') + version=$(echo "$response" | jq -r '.version') + log_success "- Schema verified for $subject (ID: $schema_id, Version: $version)" + return 0 + else + return 1 + fi +} + +# Verify a schema exists with retry logic (handles Schema Registry consumer lag) +verify_schema_with_retry() { + local subject=$1 + local max_attempts=10 + local attempt=1 + + log_info "Verifying schema for subject: $subject" + + while [[ $attempt -le $max_attempts ]]; do + local response + response=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects/$subject/versions/latest" 2>/dev/null) + + if echo "$response" | jq -e '.id' >/dev/null 2>&1; then + local schema_id + local version + schema_id=$(echo "$response" | jq -r '.id') + version=$(echo "$response" | jq -r '.version') + + if [[ $attempt -gt 1 ]]; then + log_success "- Schema verified for $subject (ID: $schema_id, Version: $version) [attempt $attempt]" + else + log_success "- Schema verified for $subject (ID: $schema_id, Version: $version)" + fi + return 0 + fi + + # Schema not found, wait and retry (handles Schema Registry consumer lag) + if [[ $attempt -lt $max_attempts ]]; then + # Longer exponential backoff for Schema Registry consumer lag: 0.5s, 1s, 2s, 3s, 4s... + local wait_time=$(echo "scale=1; 0.5 * $attempt" | bc) + sleep "$wait_time" + attempt=$((attempt + 1)) + else + log_error "x Schema not found for $subject (tried $max_attempts times)" + return 1 + fi + done + + return 1 +} + +# Register load test schemas (optimized for batch registration) +register_loadtest_schemas() { + log_info "Registering load test schemas with multiple formats..." + + # Define the Avro schema for load test messages + local avro_value_schema='{ + "type": "record", + "name": "LoadTestMessage", + "namespace": "com.seaweedfs.loadtest", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "producer_id", "type": "int"}, + {"name": "counter", "type": "long"}, + {"name": "user_id", "type": "string"}, + {"name": "event_type", "type": "string"}, + {"name": "properties", "type": {"type": "map", "values": "string"}} + ] + }' + + # Define the JSON schema for load test messages + local json_value_schema='{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "LoadTestMessage", + "type": "object", + "properties": { + "id": {"type": "string"}, + "timestamp": {"type": "integer"}, + "producer_id": {"type": "integer"}, + "counter": {"type": "integer"}, + "user_id": {"type": "string"}, + "event_type": {"type": "string"}, + "properties": { + "type": "object", + "additionalProperties": {"type": "string"} + } + }, + "required": ["id", "timestamp", "producer_id", "counter", "user_id", "event_type"] + }' + + # Define the Protobuf schema for load test messages + local protobuf_value_schema='syntax = "proto3"; + +package com.seaweedfs.loadtest; + +message LoadTestMessage { + string id = 1; + int64 timestamp = 2; + int32 producer_id = 3; + int64 counter = 4; + string user_id = 5; + string event_type = 6; + map<string, string> properties = 7; +}' + + # Define the key schema (simple string) + local avro_key_schema='{"type": "string"}' + local json_key_schema='{"type": "string"}' + local protobuf_key_schema='syntax = "proto3"; message Key { string key = 1; }' + + # Register schemas for all load test topics with different formats + local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4") + local success_count=0 + local total_schemas=0 + + # Distribute formats: topic-0=AVRO, topic-1=JSON, topic-2=PROTOBUF, topic-3=AVRO, topic-4=JSON + local idx=0 + for topic in "${topics[@]}"; do + local format + local value_schema + local key_schema + + # Determine format based on topic index (same as producer logic) + case $((idx % 3)) in + 0) + format="AVRO" + value_schema="$avro_value_schema" + key_schema="$avro_key_schema" + ;; + 1) + format="JSON" + value_schema="$json_value_schema" + key_schema="$json_key_schema" + ;; + 2) + format="PROTOBUF" + value_schema="$protobuf_value_schema" + key_schema="$protobuf_key_schema" + ;; + esac + + log_info "Registering $topic with $format schema..." + + # Register value schema + if register_schema "${topic}-value" "$value_schema" "$format"; then + success_count=$((success_count + 1)) + fi + total_schemas=$((total_schemas + 1)) + + # Small delay to let Schema Registry consumer process (prevents consumer lag) + sleep 0.2 + + # Register key schema + if register_schema "${topic}-key" "$key_schema" "$format"; then + success_count=$((success_count + 1)) + fi + total_schemas=$((total_schemas + 1)) + + # Small delay to let Schema Registry consumer process (prevents consumer lag) + sleep 0.2 + + idx=$((idx + 1)) + done + + log_info "Schema registration summary: $success_count/$total_schemas schemas registered successfully" + log_info "Format distribution: topic-0=AVRO, topic-1=JSON, topic-2=PROTOBUF, topic-3=AVRO, topic-4=JSON" + + if [[ $success_count -eq $total_schemas ]]; then + log_success "All load test schemas registered successfully with multiple formats!" + return 0 + else + log_error "Some schemas failed to register" + return 1 + fi +} + +# Verify all schemas are registered +verify_loadtest_schemas() { + log_info "Verifying load test schemas..." + + local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4") + local success_count=0 + local total_schemas=0 + + for topic in "${topics[@]}"; do + # Verify value schema with retry (handles Schema Registry consumer lag) + if verify_schema_with_retry "${topic}-value"; then + success_count=$((success_count + 1)) + fi + total_schemas=$((total_schemas + 1)) + + # Verify key schema with retry (handles Schema Registry consumer lag) + if verify_schema_with_retry "${topic}-key"; then + success_count=$((success_count + 1)) + fi + total_schemas=$((total_schemas + 1)) + done + + log_info "Schema verification summary: $success_count/$total_schemas schemas verified" + + if [[ $success_count -eq $total_schemas ]]; then + log_success "All load test schemas verified successfully!" + return 0 + else + log_error "Some schemas are missing or invalid" + return 1 + fi +} + +# List all registered subjects +list_subjects() { + log_info "Listing all registered subjects..." + + local subjects + subjects=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects" 2>/dev/null) + + if echo "$subjects" | jq -e '.[]' >/dev/null 2>&1; then + # Use process substitution instead of pipeline to avoid subshell exit code issues + while IFS= read -r subject; do + log_info " - $subject" + done < <(echo "$subjects" | jq -r '.[]') + else + log_warning "No subjects found or Schema Registry not accessible" + fi + + return 0 +} + +# Clean up schemas (for testing) +cleanup_schemas() { + log_warning "Cleaning up load test schemas..." + + local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4") + + for topic in "${topics[@]}"; do + # Delete value schema (with timeout) + curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-value" >/dev/null 2>&1 || true + curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-value?permanent=true" >/dev/null 2>&1 || true + + # Delete key schema (with timeout) + curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-key" >/dev/null 2>&1 || true + curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-key?permanent=true" >/dev/null 2>&1 || true + done + + log_success "Schema cleanup completed" +} + +# Main function +main() { + case "${1:-register}" in + "register") + wait_for_schema_registry + register_loadtest_schemas + ;; + "verify") + wait_for_schema_registry + verify_loadtest_schemas + ;; + "list") + wait_for_schema_registry + list_subjects + ;; + "cleanup") + wait_for_schema_registry + cleanup_schemas + ;; + "full") + wait_for_schema_registry + register_loadtest_schemas + # Wait for Schema Registry consumer to catch up before verification + log_info "Waiting 3 seconds for Schema Registry consumer to process all schemas..." + sleep 3 + verify_loadtest_schemas + list_subjects + ;; + *) + echo "Usage: $0 [register|verify|list|cleanup|full]" + echo "" + echo "Commands:" + echo " register - Register load test schemas (default)" + echo " verify - Verify schemas are registered" + echo " list - List all registered subjects" + echo " cleanup - Clean up load test schemas" + echo " full - Register, verify, and list schemas" + echo "" + echo "Environment variables:" + echo " SCHEMA_REGISTRY_URL - Schema Registry URL (default: http://localhost:8081)" + echo " TIMEOUT - Maximum time to wait for Schema Registry (default: 60)" + echo " CHECK_INTERVAL - Check interval in seconds (default: 2)" + exit 1 + ;; + esac + + return 0 +} + +main "$@" diff --git a/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh b/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh new file mode 100755 index 000000000..7f6ddc79a --- /dev/null +++ b/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh @@ -0,0 +1,480 @@ +#!/bin/bash + +# Kafka Client Load Test Runner Script +# This script helps run various load test scenarios against SeaweedFS Kafka Gateway + +set -euo pipefail + +# Default configuration +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +DOCKER_COMPOSE_FILE="$PROJECT_DIR/docker-compose.yml" +CONFIG_FILE="$PROJECT_DIR/config/loadtest.yaml" + +# Default test parameters +TEST_MODE="comprehensive" +TEST_DURATION="300s" +PRODUCER_COUNT=10 +CONSUMER_COUNT=5 +MESSAGE_RATE=1000 +MESSAGE_SIZE=1024 +TOPIC_COUNT=5 +PARTITIONS_PER_TOPIC=3 + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Function to print colored output +log_info() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +log_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +log_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Function to show usage +show_usage() { + cat << EOF +Kafka Client Load Test Runner + +Usage: $0 [OPTIONS] [COMMAND] + +Commands: + start Start the load test infrastructure and run tests + stop Stop all services + restart Restart all services + status Show service status + logs Show logs from all services + clean Clean up all resources (volumes, networks, etc.) + monitor Start monitoring stack (Prometheus + Grafana) + scenarios Run predefined test scenarios + +Options: + -m, --mode MODE Test mode: producer, consumer, comprehensive (default: comprehensive) + -d, --duration DURATION Test duration (default: 300s) + -p, --producers COUNT Number of producers (default: 10) + -c, --consumers COUNT Number of consumers (default: 5) + -r, --rate RATE Messages per second per producer (default: 1000) + -s, --size SIZE Message size in bytes (default: 1024) + -t, --topics COUNT Number of topics (default: 5) + --partitions COUNT Partitions per topic (default: 3) + --config FILE Configuration file (default: config/loadtest.yaml) + --monitoring Enable monitoring stack + --wait-ready Wait for services to be ready before starting tests + -v, --verbose Verbose output + -h, --help Show this help message + +Examples: + # Run comprehensive test for 5 minutes + $0 start -m comprehensive -d 5m + + # Run producer-only test with high throughput + $0 start -m producer -p 20 -r 2000 -d 10m + + # Run consumer-only test + $0 start -m consumer -c 10 + + # Run with monitoring + $0 start --monitoring -d 15m + + # Clean up everything + $0 clean + +Predefined Scenarios: + quick Quick smoke test (1 min, low load) + standard Standard load test (5 min, medium load) + stress Stress test (10 min, high load) + endurance Endurance test (30 min, sustained load) + burst Burst test (variable load) + +EOF +} + +# Parse command line arguments +parse_args() { + while [[ $# -gt 0 ]]; do + case $1 in + -m|--mode) + TEST_MODE="$2" + shift 2 + ;; + -d|--duration) + TEST_DURATION="$2" + shift 2 + ;; + -p|--producers) + PRODUCER_COUNT="$2" + shift 2 + ;; + -c|--consumers) + CONSUMER_COUNT="$2" + shift 2 + ;; + -r|--rate) + MESSAGE_RATE="$2" + shift 2 + ;; + -s|--size) + MESSAGE_SIZE="$2" + shift 2 + ;; + -t|--topics) + TOPIC_COUNT="$2" + shift 2 + ;; + --partitions) + PARTITIONS_PER_TOPIC="$2" + shift 2 + ;; + --config) + CONFIG_FILE="$2" + shift 2 + ;; + --monitoring) + ENABLE_MONITORING=1 + shift + ;; + --wait-ready) + WAIT_READY=1 + shift + ;; + -v|--verbose) + VERBOSE=1 + shift + ;; + -h|--help) + show_usage + exit 0 + ;; + -*) + log_error "Unknown option: $1" + show_usage + exit 1 + ;; + *) + if [[ -z "${COMMAND:-}" ]]; then + COMMAND="$1" + else + log_error "Multiple commands specified" + show_usage + exit 1 + fi + shift + ;; + esac + done +} + +# Check if Docker and Docker Compose are available +check_dependencies() { + if ! command -v docker &> /dev/null; then + log_error "Docker is not installed or not in PATH" + exit 1 + fi + + if ! command -v docker-compose &> /dev/null && ! docker compose version &> /dev/null; then + log_error "Docker Compose is not installed or not in PATH" + exit 1 + fi + + # Use docker compose if available, otherwise docker-compose + if docker compose version &> /dev/null; then + DOCKER_COMPOSE="docker compose" + else + DOCKER_COMPOSE="docker-compose" + fi +} + +# Wait for services to be ready +wait_for_services() { + log_info "Waiting for services to be ready..." + + local timeout=300 # 5 minutes timeout + local elapsed=0 + local check_interval=5 + + while [[ $elapsed -lt $timeout ]]; do + if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps --format table | grep -q "healthy"; then + if check_service_health; then + log_success "All services are ready!" + return 0 + fi + fi + + sleep $check_interval + elapsed=$((elapsed + check_interval)) + log_info "Waiting... ($elapsed/${timeout}s)" + done + + log_error "Services did not become ready within $timeout seconds" + return 1 +} + +# Check health of critical services +check_service_health() { + # Check Kafka Gateway + if ! curl -s http://localhost:9093 >/dev/null 2>&1; then + return 1 + fi + + # Check Schema Registry + if ! curl -s http://localhost:8081/subjects >/dev/null 2>&1; then + return 1 + fi + + return 0 +} + +# Start the load test infrastructure +start_services() { + log_info "Starting SeaweedFS Kafka load test infrastructure..." + + # Set environment variables + export TEST_MODE="$TEST_MODE" + export TEST_DURATION="$TEST_DURATION" + export PRODUCER_COUNT="$PRODUCER_COUNT" + export CONSUMER_COUNT="$CONSUMER_COUNT" + export MESSAGE_RATE="$MESSAGE_RATE" + export MESSAGE_SIZE="$MESSAGE_SIZE" + export TOPIC_COUNT="$TOPIC_COUNT" + export PARTITIONS_PER_TOPIC="$PARTITIONS_PER_TOPIC" + + # Start core services + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" up -d \ + seaweedfs-master \ + seaweedfs-volume \ + seaweedfs-filer \ + seaweedfs-mq-broker \ + kafka-gateway \ + schema-registry + + # Start monitoring if enabled + if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then + log_info "Starting monitoring stack..." + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d + fi + + # Wait for services to be ready if requested + if [[ "${WAIT_READY:-0}" == "1" ]]; then + wait_for_services + fi + + log_success "Infrastructure started successfully" +} + +# Run the load test +run_loadtest() { + log_info "Starting Kafka client load test..." + log_info "Mode: $TEST_MODE, Duration: $TEST_DURATION" + log_info "Producers: $PRODUCER_COUNT, Consumers: $CONSUMER_COUNT" + log_info "Message Rate: $MESSAGE_RATE msgs/sec, Size: $MESSAGE_SIZE bytes" + + # Run the load test + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest up --abort-on-container-exit kafka-client-loadtest + + # Show test results + show_results +} + +# Show test results +show_results() { + log_info "Load test completed! Gathering results..." + + # Get final metrics from the load test container + if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps kafka-client-loadtest-runner &>/dev/null; then + log_info "Final test statistics:" + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" exec -T kafka-client-loadtest-runner curl -s http://localhost:8080/stats || true + fi + + # Show Prometheus metrics if monitoring is enabled + if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then + log_info "Monitoring dashboards available at:" + log_info " Prometheus: http://localhost:9090" + log_info " Grafana: http://localhost:3000 (admin/admin)" + fi + + # Show where results are stored + if [[ -d "$PROJECT_DIR/test-results" ]]; then + log_info "Test results saved to: $PROJECT_DIR/test-results/" + fi +} + +# Stop services +stop_services() { + log_info "Stopping all services..." + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down + log_success "Services stopped" +} + +# Show service status +show_status() { + log_info "Service status:" + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps +} + +# Show logs +show_logs() { + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" logs -f "${1:-}" +} + +# Clean up all resources +clean_all() { + log_warning "This will remove all volumes, networks, and containers. Are you sure? (y/N)" + read -r response + if [[ "$response" =~ ^[Yy]$ ]]; then + log_info "Cleaning up all resources..." + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down -v --remove-orphans + + # Remove any remaining volumes + docker volume ls -q | grep -E "(kafka-client-loadtest|seaweedfs)" | xargs -r docker volume rm + + # Remove networks + docker network ls -q | grep -E "kafka-client-loadtest" | xargs -r docker network rm + + log_success "Cleanup completed" + else + log_info "Cleanup cancelled" + fi +} + +# Run predefined scenarios +run_scenario() { + local scenario="$1" + + case "$scenario" in + quick) + TEST_MODE="comprehensive" + TEST_DURATION="1m" + PRODUCER_COUNT=2 + CONSUMER_COUNT=2 + MESSAGE_RATE=100 + MESSAGE_SIZE=512 + TOPIC_COUNT=2 + ;; + standard) + TEST_MODE="comprehensive" + TEST_DURATION="5m" + PRODUCER_COUNT=5 + CONSUMER_COUNT=3 + MESSAGE_RATE=500 + MESSAGE_SIZE=1024 + TOPIC_COUNT=3 + ;; + stress) + TEST_MODE="comprehensive" + TEST_DURATION="10m" + PRODUCER_COUNT=20 + CONSUMER_COUNT=10 + MESSAGE_RATE=2000 + MESSAGE_SIZE=2048 + TOPIC_COUNT=10 + ;; + endurance) + TEST_MODE="comprehensive" + TEST_DURATION="30m" + PRODUCER_COUNT=10 + CONSUMER_COUNT=5 + MESSAGE_RATE=1000 + MESSAGE_SIZE=1024 + TOPIC_COUNT=5 + ;; + burst) + TEST_MODE="comprehensive" + TEST_DURATION="10m" + PRODUCER_COUNT=10 + CONSUMER_COUNT=5 + MESSAGE_RATE=1000 + MESSAGE_SIZE=1024 + TOPIC_COUNT=5 + # Note: Burst behavior would be configured in the load test config + ;; + *) + log_error "Unknown scenario: $scenario" + log_info "Available scenarios: quick, standard, stress, endurance, burst" + exit 1 + ;; + esac + + log_info "Running $scenario scenario..." + start_services + if [[ "${WAIT_READY:-0}" == "1" ]]; then + wait_for_services + fi + run_loadtest +} + +# Main execution +main() { + if [[ $# -eq 0 ]]; then + show_usage + exit 0 + fi + + parse_args "$@" + check_dependencies + + case "${COMMAND:-}" in + start) + start_services + run_loadtest + ;; + stop) + stop_services + ;; + restart) + stop_services + start_services + ;; + status) + show_status + ;; + logs) + show_logs + ;; + clean) + clean_all + ;; + monitor) + ENABLE_MONITORING=1 + $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d + log_success "Monitoring stack started" + log_info "Prometheus: http://localhost:9090" + log_info "Grafana: http://localhost:3000 (admin/admin)" + ;; + scenarios) + if [[ -n "${2:-}" ]]; then + run_scenario "$2" + else + log_error "Please specify a scenario" + log_info "Available scenarios: quick, standard, stress, endurance, burst" + exit 1 + fi + ;; + *) + log_error "Unknown command: ${COMMAND:-}" + show_usage + exit 1 + ;; + esac +} + +# Set default values +ENABLE_MONITORING=0 +WAIT_READY=0 +VERBOSE=0 + +# Run main function +main "$@" diff --git a/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh b/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh new file mode 100755 index 000000000..3ea43f998 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh @@ -0,0 +1,352 @@ +#!/bin/bash + +# Setup monitoring for Kafka Client Load Test +# This script sets up Prometheus and Grafana configurations + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +MONITORING_DIR="$PROJECT_DIR/monitoring" + +# Colors +GREEN='\033[0;32m' +BLUE='\033[0;34m' +NC='\033[0m' + +log_info() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +log_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +# Create monitoring directory structure +setup_directories() { + log_info "Setting up monitoring directories..." + + mkdir -p "$MONITORING_DIR/prometheus" + mkdir -p "$MONITORING_DIR/grafana/dashboards" + mkdir -p "$MONITORING_DIR/grafana/provisioning/dashboards" + mkdir -p "$MONITORING_DIR/grafana/provisioning/datasources" + + log_success "Directories created" +} + +# Create Prometheus configuration +create_prometheus_config() { + log_info "Creating Prometheus configuration..." + + cat > "$MONITORING_DIR/prometheus/prometheus.yml" << 'EOF' +# Prometheus configuration for Kafka Load Test monitoring + +global: + scrape_interval: 15s + evaluation_interval: 15s + +rule_files: + # - "first_rules.yml" + # - "second_rules.yml" + +scrape_configs: + # Scrape Prometheus itself + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + # Scrape load test metrics + - job_name: 'kafka-loadtest' + static_configs: + - targets: ['kafka-client-loadtest-runner:8080'] + scrape_interval: 5s + metrics_path: '/metrics' + + # Scrape SeaweedFS Master metrics + - job_name: 'seaweedfs-master' + static_configs: + - targets: ['seaweedfs-master:9333'] + metrics_path: '/metrics' + + # Scrape SeaweedFS Volume metrics + - job_name: 'seaweedfs-volume' + static_configs: + - targets: ['seaweedfs-volume:8080'] + metrics_path: '/metrics' + + # Scrape SeaweedFS Filer metrics + - job_name: 'seaweedfs-filer' + static_configs: + - targets: ['seaweedfs-filer:8888'] + metrics_path: '/metrics' + + # Scrape SeaweedFS MQ Broker metrics (if available) + - job_name: 'seaweedfs-mq-broker' + static_configs: + - targets: ['seaweedfs-mq-broker:17777'] + metrics_path: '/metrics' + scrape_interval: 10s + + # Scrape Kafka Gateway metrics (if available) + - job_name: 'kafka-gateway' + static_configs: + - targets: ['kafka-gateway:9093'] + metrics_path: '/metrics' + scrape_interval: 10s +EOF + + log_success "Prometheus configuration created" +} + +# Create Grafana datasource configuration +create_grafana_datasource() { + log_info "Creating Grafana datasource configuration..." + + cat > "$MONITORING_DIR/grafana/provisioning/datasources/datasource.yml" << 'EOF' +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + orgId: 1 + url: http://prometheus:9090 + basicAuth: false + isDefault: true + editable: true + version: 1 +EOF + + log_success "Grafana datasource configuration created" +} + +# Create Grafana dashboard provisioning +create_grafana_dashboard_provisioning() { + log_info "Creating Grafana dashboard provisioning..." + + cat > "$MONITORING_DIR/grafana/provisioning/dashboards/dashboard.yml" << 'EOF' +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards +EOF + + log_success "Grafana dashboard provisioning created" +} + +# Create Kafka Load Test dashboard +create_loadtest_dashboard() { + log_info "Creating Kafka Load Test Grafana dashboard..." + + cat > "$MONITORING_DIR/grafana/dashboards/kafka-loadtest.json" << 'EOF' +{ + "dashboard": { + "id": null, + "title": "Kafka Client Load Test Dashboard", + "tags": ["kafka", "loadtest", "seaweedfs"], + "timezone": "browser", + "panels": [ + { + "id": 1, + "title": "Messages Produced/Consumed", + "type": "stat", + "targets": [ + { + "expr": "rate(kafka_loadtest_messages_produced_total[5m])", + "legendFormat": "Produced/sec" + }, + { + "expr": "rate(kafka_loadtest_messages_consumed_total[5m])", + "legendFormat": "Consumed/sec" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0} + }, + { + "id": 2, + "title": "Message Latency", + "type": "graph", + "targets": [ + { + "expr": "histogram_quantile(0.95, kafka_loadtest_message_latency_seconds)", + "legendFormat": "95th percentile" + }, + { + "expr": "histogram_quantile(0.99, kafka_loadtest_message_latency_seconds)", + "legendFormat": "99th percentile" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0} + }, + { + "id": 3, + "title": "Error Rates", + "type": "graph", + "targets": [ + { + "expr": "rate(kafka_loadtest_producer_errors_total[5m])", + "legendFormat": "Producer Errors/sec" + }, + { + "expr": "rate(kafka_loadtest_consumer_errors_total[5m])", + "legendFormat": "Consumer Errors/sec" + } + ], + "gridPos": {"h": 8, "w": 24, "x": 0, "y": 8} + }, + { + "id": 4, + "title": "Throughput (MB/s)", + "type": "graph", + "targets": [ + { + "expr": "rate(kafka_loadtest_bytes_produced_total[5m]) / 1024 / 1024", + "legendFormat": "Produced MB/s" + }, + { + "expr": "rate(kafka_loadtest_bytes_consumed_total[5m]) / 1024 / 1024", + "legendFormat": "Consumed MB/s" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 16} + }, + { + "id": 5, + "title": "Active Clients", + "type": "stat", + "targets": [ + { + "expr": "kafka_loadtest_active_producers", + "legendFormat": "Producers" + }, + { + "expr": "kafka_loadtest_active_consumers", + "legendFormat": "Consumers" + } + ], + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 16} + }, + { + "id": 6, + "title": "Consumer Lag", + "type": "graph", + "targets": [ + { + "expr": "kafka_loadtest_consumer_lag_messages", + "legendFormat": "{{consumer_group}}-{{topic}}-{{partition}}" + } + ], + "gridPos": {"h": 8, "w": 24, "x": 0, "y": 24} + } + ], + "time": {"from": "now-30m", "to": "now"}, + "refresh": "5s", + "schemaVersion": 16, + "version": 0 + } +} +EOF + + log_success "Kafka Load Test dashboard created" +} + +# Create SeaweedFS dashboard +create_seaweedfs_dashboard() { + log_info "Creating SeaweedFS Grafana dashboard..." + + cat > "$MONITORING_DIR/grafana/dashboards/seaweedfs.json" << 'EOF' +{ + "dashboard": { + "id": null, + "title": "SeaweedFS Cluster Dashboard", + "tags": ["seaweedfs", "storage"], + "timezone": "browser", + "panels": [ + { + "id": 1, + "title": "Master Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-master\"}", + "legendFormat": "Master Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 0, "y": 0} + }, + { + "id": 2, + "title": "Volume Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-volume\"}", + "legendFormat": "Volume Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 6, "y": 0} + }, + { + "id": 3, + "title": "Filer Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-filer\"}", + "legendFormat": "Filer Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 12, "y": 0} + }, + { + "id": 4, + "title": "MQ Broker Status", + "type": "stat", + "targets": [ + { + "expr": "up{job=\"seaweedfs-mq-broker\"}", + "legendFormat": "MQ Broker Up" + } + ], + "gridPos": {"h": 4, "w": 6, "x": 18, "y": 0} + } + ], + "time": {"from": "now-30m", "to": "now"}, + "refresh": "10s", + "schemaVersion": 16, + "version": 0 + } +} +EOF + + log_success "SeaweedFS dashboard created" +} + +# Main setup function +main() { + log_info "Setting up monitoring for Kafka Client Load Test..." + + setup_directories + create_prometheus_config + create_grafana_datasource + create_grafana_dashboard_provisioning + create_loadtest_dashboard + create_seaweedfs_dashboard + + log_success "Monitoring setup completed!" + log_info "You can now start the monitoring stack with:" + log_info " ./scripts/run-loadtest.sh monitor" + log_info "" + log_info "After starting, access:" + log_info " Prometheus: http://localhost:9090" + log_info " Grafana: http://localhost:3000 (admin/admin)" +} + +main "$@" diff --git a/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh b/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh new file mode 100755 index 000000000..e1a2f73e2 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh @@ -0,0 +1,151 @@ +#!/bin/bash + +# Test script to verify the retry logic works correctly +# Simulates Schema Registry eventual consistency behavior + +set -euo pipefail + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +log_info() { + echo -e "${BLUE}[TEST]${NC} $1" +} + +log_success() { + echo -e "${GREEN}[PASS]${NC} $1" +} + +log_error() { + echo -e "${RED}[FAIL]${NC} $1" +} + +# Mock function that simulates Schema Registry eventual consistency +# First N attempts fail, then succeeds +mock_schema_registry_query() { + local subject=$1 + local min_attempts_to_succeed=$2 + local current_attempt=$3 + + if [[ $current_attempt -ge $min_attempts_to_succeed ]]; then + # Simulate successful response + echo '{"id":1,"version":1,"schema":"test"}' + return 0 + else + # Simulate 404 Not Found + echo '{"error_code":40401,"message":"Subject not found"}' + return 1 + fi +} + +# Simulate verify_schema_with_retry logic +test_verify_with_retry() { + local subject=$1 + local min_attempts_to_succeed=$2 + local max_attempts=5 + local attempt=1 + + log_info "Testing $subject (should succeed after $min_attempts_to_succeed attempts)" + + while [[ $attempt -le $max_attempts ]]; do + local response + if response=$(mock_schema_registry_query "$subject" "$min_attempts_to_succeed" "$attempt"); then + if echo "$response" | grep -q '"id"'; then + if [[ $attempt -gt 1 ]]; then + log_success "$subject verified after $attempt attempts" + else + log_success "$subject verified on first attempt" + fi + return 0 + fi + fi + + # Schema not found, wait and retry + if [[ $attempt -lt $max_attempts ]]; then + # Exponential backoff: 0.1s, 0.2s, 0.4s, 0.8s + local wait_time=$(echo "scale=3; 0.1 * (2 ^ ($attempt - 1))" | bc) + log_info " Attempt $attempt failed, waiting ${wait_time}s before retry..." + sleep "$wait_time" + attempt=$((attempt + 1)) + else + log_error "$subject verification failed after $max_attempts attempts" + return 1 + fi + done + + return 1 +} + +# Run tests +log_info "==========================================" +log_info "Testing Schema Registry Retry Logic" +log_info "==========================================" +echo "" + +# Test 1: Schema available immediately +log_info "Test 1: Schema available immediately" +if test_verify_with_retry "immediate-schema" 1; then + log_success "✓ Test 1 passed" +else + log_error "✗ Test 1 failed" + exit 1 +fi +echo "" + +# Test 2: Schema available after 2 attempts (200ms delay) +log_info "Test 2: Schema available after 2 attempts" +if test_verify_with_retry "delayed-schema-2" 2; then + log_success "✓ Test 2 passed" +else + log_error "✗ Test 2 failed" + exit 1 +fi +echo "" + +# Test 3: Schema available after 3 attempts (600ms delay) +log_info "Test 3: Schema available after 3 attempts" +if test_verify_with_retry "delayed-schema-3" 3; then + log_success "✓ Test 3 passed" +else + log_error "✗ Test 3 failed" + exit 1 +fi +echo "" + +# Test 4: Schema available after 4 attempts (1400ms delay) +log_info "Test 4: Schema available after 4 attempts" +if test_verify_with_retry "delayed-schema-4" 4; then + log_success "✓ Test 4 passed" +else + log_error "✗ Test 4 failed" + exit 1 +fi +echo "" + +# Test 5: Schema never available (should fail) +log_info "Test 5: Schema never available (should fail gracefully)" +if test_verify_with_retry "missing-schema" 10; then + log_error "✗ Test 5 failed (should have failed but passed)" + exit 1 +else + log_success "✓ Test 5 passed (correctly failed after max attempts)" +fi +echo "" + +log_success "==========================================" +log_success "All tests passed! ✓" +log_success "==========================================" +log_info "" +log_info "Summary:" +log_info "- Immediate availability: works ✓" +log_info "- 2-4 retry attempts: works ✓" +log_info "- Max attempts handling: works ✓" +log_info "- Exponential backoff: works ✓" +log_info "" +log_info "Total retry time budget: ~1.5 seconds (0.1+0.2+0.4+0.8)" +log_info "This should handle Schema Registry consumer lag gracefully." + diff --git a/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh b/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh new file mode 100755 index 000000000..d2560728b --- /dev/null +++ b/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh @@ -0,0 +1,291 @@ +#!/bin/bash + +# Wait for SeaweedFS and Kafka Gateway services to be ready +# This script checks service health and waits until all services are operational + +set -euo pipefail + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +log_info() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +log_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +log_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Configuration +TIMEOUT=${TIMEOUT:-300} # 5 minutes default timeout +CHECK_INTERVAL=${CHECK_INTERVAL:-5} # Check every 5 seconds +SEAWEEDFS_MASTER_URL=${SEAWEEDFS_MASTER_URL:-"http://localhost:9333"} +KAFKA_GATEWAY_URL=${KAFKA_GATEWAY_URL:-"localhost:9093"} +SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"} +SEAWEEDFS_FILER_URL=${SEAWEEDFS_FILER_URL:-"http://localhost:8888"} + +# Check if a service is reachable +check_http_service() { + local url=$1 + local name=$2 + + if curl -sf "$url" >/dev/null 2>&1; then + return 0 + else + return 1 + fi +} + +# Check TCP port +check_tcp_service() { + local host=$1 + local port=$2 + local name=$3 + + if timeout 3 bash -c "</dev/tcp/$host/$port" 2>/dev/null; then + return 0 + else + return 1 + fi +} + +# Check SeaweedFS Master +check_seaweedfs_master() { + if check_http_service "$SEAWEEDFS_MASTER_URL/cluster/status" "SeaweedFS Master"; then + # Additional check: ensure cluster has volumes + local status_json + status_json=$(curl -s "$SEAWEEDFS_MASTER_URL/cluster/status" 2>/dev/null || echo "{}") + + # Check if we have at least one volume server + if echo "$status_json" | grep -q '"Max":0'; then + log_warning "SeaweedFS Master is running but no volumes are available" + return 1 + fi + + return 0 + fi + return 1 +} + +# Check SeaweedFS Filer +check_seaweedfs_filer() { + check_http_service "$SEAWEEDFS_FILER_URL/" "SeaweedFS Filer" +} + +# Check Kafka Gateway +check_kafka_gateway() { + local host="localhost" + local port="9093" + check_tcp_service "$host" "$port" "Kafka Gateway" +} + +# Check Schema Registry +check_schema_registry() { + # Check if Schema Registry container is running first + if ! docker compose ps schema-registry | grep -q "Up"; then + # Schema Registry is not running, which is okay for basic tests + return 0 + fi + + # FIXED: Wait for Docker healthcheck to report "healthy", not just "Up" + # Schema Registry has a 30s start_period, so we need to wait for the actual healthcheck + local health_status + health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "none") + + # If container has no healthcheck or healthcheck is not yet healthy, check HTTP directly + if [[ "$health_status" == "healthy" ]]; then + # Container reports healthy, do a final verification + if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then + return 0 + fi + elif [[ "$health_status" == "starting" ]]; then + # Still in startup period, wait longer + return 1 + elif [[ "$health_status" == "none" ]]; then + # No healthcheck defined (shouldn't happen), fall back to HTTP check + if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then + local subjects + subjects=$(curl -s "$SCHEMA_REGISTRY_URL/subjects" 2>/dev/null || echo "[]") + + # Schema registry should at least return an empty array + if [[ "$subjects" == "[]" ]]; then + return 0 + elif echo "$subjects" | grep -q '\['; then + return 0 + else + log_warning "Schema Registry is not properly connected" + return 1 + fi + fi + fi + return 1 +} + +# Check MQ Broker +check_mq_broker() { + check_tcp_service "localhost" "17777" "SeaweedFS MQ Broker" +} + +# Main health check function +check_all_services() { + local all_healthy=true + + log_info "Checking service health..." + + # Check SeaweedFS Master + if check_seaweedfs_master; then + log_success "✓ SeaweedFS Master is healthy" + else + log_error "✗ SeaweedFS Master is not ready" + all_healthy=false + fi + + # Check SeaweedFS Filer + if check_seaweedfs_filer; then + log_success "✓ SeaweedFS Filer is healthy" + else + log_error "✗ SeaweedFS Filer is not ready" + all_healthy=false + fi + + # Check MQ Broker + if check_mq_broker; then + log_success "✓ SeaweedFS MQ Broker is healthy" + else + log_error "✗ SeaweedFS MQ Broker is not ready" + all_healthy=false + fi + + # Check Kafka Gateway + if check_kafka_gateway; then + log_success "✓ Kafka Gateway is healthy" + else + log_error "✗ Kafka Gateway is not ready" + all_healthy=false + fi + + # Check Schema Registry + if ! docker compose ps schema-registry | grep -q "Up"; then + log_warning "⚠ Schema Registry is stopped (skipping)" + elif check_schema_registry; then + log_success "✓ Schema Registry is healthy" + else + # Check if it's still starting up (healthcheck start_period) + local health_status + health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "unknown") + if [[ "$health_status" == "starting" ]]; then + log_warning "⏳ Schema Registry is starting (waiting for healthcheck...)" + else + log_error "✗ Schema Registry is not ready (status: $health_status)" + fi + all_healthy=false + fi + + $all_healthy +} + +# Wait for all services to be ready +wait_for_services() { + log_info "Waiting for all services to be ready (timeout: ${TIMEOUT}s)..." + + local elapsed=0 + + while [[ $elapsed -lt $TIMEOUT ]]; do + if check_all_services; then + log_success "All services are ready! (took ${elapsed}s)" + return 0 + fi + + log_info "Some services are not ready yet. Waiting ${CHECK_INTERVAL}s... (${elapsed}/${TIMEOUT}s)" + sleep $CHECK_INTERVAL + elapsed=$((elapsed + CHECK_INTERVAL)) + done + + log_error "Services did not become ready within ${TIMEOUT} seconds" + log_error "Final service status:" + check_all_services + + # Always dump Schema Registry diagnostics on timeout since it's the problematic service + log_error "===========================================" + log_error "Schema Registry Container Status:" + log_error "===========================================" + docker compose ps schema-registry 2>&1 || echo "Failed to get container status" + docker inspect loadtest-schema-registry --format='Health: {{.State.Health.Status}} ({{len .State.Health.Log}} checks)' 2>&1 || echo "Failed to inspect container" + log_error "===========================================" + + log_error "Network Connectivity Check:" + log_error "===========================================" + log_error "Can Schema Registry reach Kafka Gateway?" + docker compose exec -T schema-registry ping -c 3 kafka-gateway 2>&1 || echo "Ping failed" + docker compose exec -T schema-registry nc -zv kafka-gateway 9093 2>&1 || echo "Port 9093 unreachable" + log_error "===========================================" + + log_error "Schema Registry Logs (last 100 lines):" + log_error "===========================================" + docker compose logs --tail=100 schema-registry 2>&1 || echo "Failed to get Schema Registry logs" + log_error "===========================================" + + log_error "Kafka Gateway Logs (last 50 lines with 'SR' prefix):" + log_error "===========================================" + docker compose logs --tail=200 kafka-gateway 2>&1 | grep -i "SR" | tail -50 || echo "No SR-related logs found in Kafka Gateway" + log_error "===========================================" + + log_error "MQ Broker Logs (last 30 lines):" + log_error "===========================================" + docker compose logs --tail=30 seaweedfs-mq-broker 2>&1 || echo "Failed to get MQ Broker logs" + log_error "===========================================" + + return 1 +} + +# Show current service status +show_status() { + log_info "Current service status:" + check_all_services +} + +# Main function +main() { + case "${1:-wait}" in + "wait") + wait_for_services + ;; + "check") + show_status + ;; + "status") + show_status + ;; + *) + echo "Usage: $0 [wait|check|status]" + echo "" + echo "Commands:" + echo " wait - Wait for all services to be ready (default)" + echo " check - Check current service status" + echo " status - Same as check" + echo "" + echo "Environment variables:" + echo " TIMEOUT - Maximum time to wait in seconds (default: 300)" + echo " CHECK_INTERVAL - Check interval in seconds (default: 5)" + echo " SEAWEEDFS_MASTER_URL - Master URL (default: http://localhost:9333)" + echo " KAFKA_GATEWAY_URL - Gateway URL (default: localhost:9093)" + echo " SCHEMA_REGISTRY_URL - Schema Registry URL (default: http://localhost:8081)" + echo " SEAWEEDFS_FILER_URL - Filer URL (default: http://localhost:8888)" + exit 1 + ;; + esac +} + +main "$@" diff --git a/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java new file mode 100644 index 000000000..f511b4cf6 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java @@ -0,0 +1,290 @@ +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.Node; + +import java.io.*; +import java.net.*; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ExecutionException; + +public class AdminClientDebugger { + + public static void main(String[] args) throws Exception { + String broker = args.length > 0 ? args[0] : "localhost:9093"; + + System.out.println("=".repeat(80)); + System.out.println("KAFKA ADMINCLIENT DEBUGGER"); + System.out.println("=".repeat(80)); + System.out.println("Target broker: " + broker); + + // Test 1: Raw socket - capture exact bytes + System.out.println("\n" + "=".repeat(80)); + System.out.println("TEST 1: Raw Socket - Capture ApiVersions Exchange"); + System.out.println("=".repeat(80)); + testRawSocket(broker); + + // Test 2: AdminClient with detailed logging + System.out.println("\n" + "=".repeat(80)); + System.out.println("TEST 2: AdminClient with Logging"); + System.out.println("=".repeat(80)); + testAdminClient(broker); + } + + private static void testRawSocket(String broker) { + String[] parts = broker.split(":"); + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + + try (Socket socket = new Socket(host, port)) { + socket.setSoTimeout(10000); + + InputStream in = socket.getInputStream(); + OutputStream out = socket.getOutputStream(); + + System.out.println("Connected to " + broker); + + // Build ApiVersions request (v4) + // Format: + // [Size][ApiKey=18][ApiVersion=4][CorrelationId=0][ClientId][TaggedFields] + ByteArrayOutputStream requestBody = new ByteArrayOutputStream(); + + // ApiKey (2 bytes) = 18 + requestBody.write(0); + requestBody.write(18); + + // ApiVersion (2 bytes) = 4 + requestBody.write(0); + requestBody.write(4); + + // CorrelationId (4 bytes) = 0 + requestBody.write(new byte[] { 0, 0, 0, 0 }); + + // ClientId (compact string) = "debug-client" + String clientId = "debug-client"; + writeCompactString(requestBody, clientId); + + // Tagged fields (empty) + requestBody.write(0x00); + + byte[] request = requestBody.toByteArray(); + + // Write size + ByteBuffer sizeBuffer = ByteBuffer.allocate(4); + sizeBuffer.putInt(request.length); + out.write(sizeBuffer.array()); + + // Write request + out.write(request); + out.flush(); + + System.out.println("\nSENT ApiVersions v4 Request:"); + System.out.println(" Size: " + request.length + " bytes"); + hexDump(" Request", request, Math.min(64, request.length)); + + // Read response size + byte[] sizeBytes = new byte[4]; + int read = in.read(sizeBytes); + if (read != 4) { + System.out.println("Failed to read response size (got " + read + " bytes)"); + return; + } + + int responseSize = ByteBuffer.wrap(sizeBytes).getInt(); + System.out.println("\nRECEIVED Response:"); + System.out.println(" Size: " + responseSize + " bytes"); + + // Read response body + byte[] responseBytes = new byte[responseSize]; + int totalRead = 0; + while (totalRead < responseSize) { + int n = in.read(responseBytes, totalRead, responseSize - totalRead); + if (n == -1) { + System.out.println("Unexpected EOF after " + totalRead + " bytes"); + return; + } + totalRead += n; + } + + System.out.println(" Read complete response: " + totalRead + " bytes"); + + // Decode response + System.out.println("\nRESPONSE STRUCTURE:"); + decodeApiVersionsResponse(responseBytes); + + // Try to read more (should timeout or get EOF) + System.out.println("\n⏱️ Waiting for any additional data (10s timeout)..."); + socket.setSoTimeout(10000); + try { + int nextByte = in.read(); + if (nextByte == -1) { + System.out.println(" Server closed connection (EOF)"); + } else { + System.out.println(" Unexpected data: " + nextByte); + } + } catch (SocketTimeoutException e) { + System.out.println(" Timeout - no additional data"); + } + + } catch (Exception e) { + System.out.println("Error: " + e.getMessage()); + e.printStackTrace(); + } + } + + private static void testAdminClient(String broker) { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin-client-debugger"); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000); + props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000); + + System.out.println("Creating AdminClient with config:"); + props.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + + try (AdminClient adminClient = AdminClient.create(props)) { + System.out.println("AdminClient created"); + + // Give the thread time to start + Thread.sleep(1000); + + System.out.println("\nCalling describeCluster()..."); + DescribeClusterResult result = adminClient.describeCluster(); + + System.out.println(" Waiting for nodes..."); + Collection<Node> nodes = result.nodes().get(); + + System.out.println("Cluster description retrieved:"); + System.out.println(" Nodes: " + nodes.size()); + for (Node node : nodes) { + System.out.println(" - Node " + node.id() + ": " + node.host() + ":" + node.port()); + } + + System.out.println("\n Cluster ID: " + result.clusterId().get()); + + Node controller = result.controller().get(); + if (controller != null) { + System.out.println(" Controller: Node " + controller.id()); + } + + } catch (ExecutionException e) { + System.out.println("Execution error: " + e.getCause().getMessage()); + e.getCause().printStackTrace(); + } catch (Exception e) { + System.out.println("Error: " + e.getMessage()); + e.printStackTrace(); + } + } + + private static void decodeApiVersionsResponse(byte[] data) { + int offset = 0; + + try { + // Correlation ID (4 bytes) + int correlationId = ByteBuffer.wrap(data, offset, 4).getInt(); + System.out.println(" [Offset " + offset + "] Correlation ID: " + correlationId); + offset += 4; + + // Header tagged fields (varint - should be 0x00 for flexible v3+) + int taggedFieldsLength = readUnsignedVarint(data, offset); + System.out.println(" [Offset " + offset + "] Header Tagged Fields Length: " + taggedFieldsLength); + offset += varintSize(data[offset]); + + // Error code (2 bytes) + short errorCode = ByteBuffer.wrap(data, offset, 2).getShort(); + System.out.println(" [Offset " + offset + "] Error Code: " + errorCode); + offset += 2; + + // API Keys array (compact array - varint length) + int apiKeysLength = readUnsignedVarint(data, offset) - 1; // Compact array: length+1 + System.out.println(" [Offset " + offset + "] API Keys Count: " + apiKeysLength); + offset += varintSize(data[offset]); + + // Show first few API keys + System.out.println(" First 5 API Keys:"); + for (int i = 0; i < Math.min(5, apiKeysLength); i++) { + short apiKey = ByteBuffer.wrap(data, offset, 2).getShort(); + offset += 2; + short minVersion = ByteBuffer.wrap(data, offset, 2).getShort(); + offset += 2; + short maxVersion = ByteBuffer.wrap(data, offset, 2).getShort(); + offset += 2; + // Per-element tagged fields + int perElementTagged = readUnsignedVarint(data, offset); + offset += varintSize(data[offset]); + + System.out.println(" " + (i + 1) + ". API " + apiKey + ": v" + minVersion + "-v" + maxVersion); + } + + System.out.println(" ... (showing first 5 of " + apiKeysLength + " APIs)"); + System.out.println(" Response structure is valid!"); + + // Hex dump of first 64 bytes + hexDump("\n First 64 bytes", data, Math.min(64, data.length)); + + } catch (Exception e) { + System.out.println(" Failed to decode at offset " + offset + ": " + e.getMessage()); + hexDump(" Raw bytes", data, Math.min(128, data.length)); + } + } + + private static int readUnsignedVarint(byte[] data, int offset) { + int value = 0; + int shift = 0; + while (true) { + byte b = data[offset++]; + value |= (b & 0x7F) << shift; + if ((b & 0x80) == 0) + break; + shift += 7; + } + return value; + } + + private static int varintSize(byte firstByte) { + int size = 1; + byte b = firstByte; + while ((b & 0x80) != 0) { + size++; + b = (byte) (b << 1); + } + return size; + } + + private static void writeCompactString(ByteArrayOutputStream out, String str) { + byte[] bytes = str.getBytes(); + writeUnsignedVarint(out, bytes.length + 1); // Compact string: length+1 + out.write(bytes, 0, bytes.length); + } + + private static void writeUnsignedVarint(ByteArrayOutputStream out, int value) { + while ((value & ~0x7F) != 0) { + out.write((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + out.write((byte) value); + } + + private static void hexDump(String label, byte[] data, int length) { + System.out.println(label + " (hex dump):"); + for (int i = 0; i < length; i += 16) { + System.out.printf(" %04x ", i); + for (int j = 0; j < 16; j++) { + if (i + j < length) { + System.out.printf("%02x ", data[i + j] & 0xFF); + } else { + System.out.print(" "); + } + if (j == 7) + System.out.print(" "); + } + System.out.print(" |"); + for (int j = 0; j < 16 && i + j < length; j++) { + byte b = data[i + j]; + System.out.print((b >= 32 && b < 127) ? (char) b : '.'); + } + System.out.println("|"); + } + } +} diff --git a/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java b/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java new file mode 100644 index 000000000..177a86233 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java @@ -0,0 +1,72 @@ +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.ListTopicsResult; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class JavaAdminClientTest { + public static void main(String[] args) { + // Set uncaught exception handler to catch AdminClient thread errors + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + System.err.println("UNCAUGHT EXCEPTION in thread " + t.getName() + ":"); + e.printStackTrace(); + }); + + String bootstrapServers = args.length > 0 ? args[0] : "localhost:9093"; + + System.out.println("Testing Kafka wire protocol with broker: " + bootstrapServers); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000); + props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, "java-admin-test"); + props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000); + props.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 10000); + props.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 30000); + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + props.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 50); + props.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000); + + System.out.println("Creating AdminClient with config:"); + props.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + + try (AdminClient adminClient = AdminClient.create(props)) { + System.out.println("AdminClient created successfully"); + Thread.sleep(2000); // Give it time to initialize + + // Test 1: Describe Cluster (uses Metadata API internally) + System.out.println("\n=== Test 1: Describe Cluster ==="); + try { + DescribeClusterResult clusterResult = adminClient.describeCluster(); + String clusterId = clusterResult.clusterId().get(10, TimeUnit.SECONDS); + int nodeCount = clusterResult.nodes().get(10, TimeUnit.SECONDS).size(); + System.out.println("Cluster ID: " + clusterId); + System.out.println("Nodes: " + nodeCount); + } catch (Exception e) { + System.err.println("Describe Cluster failed: " + e.getMessage()); + e.printStackTrace(); + } + + // Test 2: List Topics + System.out.println("\n=== Test 2: List Topics ==="); + try { + ListTopicsResult topicsResult = adminClient.listTopics(); + int topicCount = topicsResult.names().get(10, TimeUnit.SECONDS).size(); + System.out.println("Topics: " + topicCount); + } catch (Exception e) { + System.err.println("List Topics failed: " + e.getMessage()); + e.printStackTrace(); + } + + System.out.println("\nAll tests completed!"); + + } catch (Exception e) { + System.err.println("AdminClient creation failed: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } +} diff --git a/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java b/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java new file mode 100644 index 000000000..41c884544 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java @@ -0,0 +1,82 @@ +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class JavaKafkaConsumer { + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: java JavaKafkaConsumer <broker> <topic>"); + System.exit(1); + } + + String broker = args[0]; + String topic = args[1]; + + System.out.println("Connecting to Kafka broker: " + broker); + System.out.println("Topic: " + topic); + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "java-test-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); + props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000"); + + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(topic)); + + System.out.println("Starting to consume messages..."); + + int messageCount = 0; + int errorCount = 0; + long startTime = System.currentTimeMillis(); + + try { + while (true) { + try { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); + + for (ConsumerRecord<String, String> record : records) { + messageCount++; + System.out.printf("Message #%d: topic=%s partition=%d offset=%d key=%s value=%s%n", + messageCount, record.topic(), record.partition(), record.offset(), + record.key(), record.value()); + } + + // Stop after 100 messages or 60 seconds + if (messageCount >= 100 || (System.currentTimeMillis() - startTime) > 60000) { + long duration = System.currentTimeMillis() - startTime; + System.out.printf("%nSuccessfully consumed %d messages in %dms%n", messageCount, duration); + System.out.printf("Success rate: %.1f%% (%d/%d including errors)%n", + (double) messageCount / (messageCount + errorCount) * 100, messageCount, + messageCount + errorCount); + break; + } + } catch (Exception e) { + errorCount++; + System.err.printf("Error during poll #%d: %s%n", errorCount, e.getMessage()); + e.printStackTrace(); + + // Stop after 10 consecutive errors or 60 seconds + if (errorCount > 10 || (System.currentTimeMillis() - startTime) > 60000) { + long duration = System.currentTimeMillis() - startTime; + System.err.printf("%nStopping after %d errors in %dms%n", errorCount, duration); + break; + } + } + } + } finally { + consumer.close(); + } + } +} diff --git a/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java b/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java new file mode 100644 index 000000000..e9898d5f0 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java @@ -0,0 +1,68 @@ +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.concurrent.Future; + +public class JavaProducerTest { + public static void main(String[] args) { + String bootstrapServers = args.length > 0 ? args[0] : "localhost:9093"; + String topicName = args.length > 1 ? args[1] : "test-topic"; + + System.out.println("Testing Kafka Producer with broker: " + bootstrapServers); + System.out.println(" Topic: " + topicName); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "java-producer-test"); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); + + System.out.println("Creating Producer with config:"); + props.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + + try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) { + System.out.println("Producer created successfully"); + + // Try to send a test message + System.out.println("\n=== Test: Send Message ==="); + try { + ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "key1", "value1"); + System.out.println("Sending record to topic: " + topicName); + Future<RecordMetadata> future = producer.send(record); + + RecordMetadata metadata = future.get(); // This will block and wait for response + System.out.println("Message sent successfully!"); + System.out.println(" Topic: " + metadata.topic()); + System.out.println(" Partition: " + metadata.partition()); + System.out.println(" Offset: " + metadata.offset()); + } catch (Exception e) { + System.err.println("Send failed: " + e.getMessage()); + e.printStackTrace(); + + // Print cause chain + Throwable cause = e.getCause(); + int depth = 1; + while (cause != null && depth < 5) { + System.err.println( + " Cause " + depth + ": " + cause.getClass().getName() + ": " + cause.getMessage()); + cause = cause.getCause(); + depth++; + } + } + + System.out.println("\nTest completed!"); + + } catch (Exception e) { + System.err.println("Producer creation or operation failed: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } +} diff --git a/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java b/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java new file mode 100644 index 000000000..3c33ae0ea --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java @@ -0,0 +1,124 @@ +package tools; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; + +public class SchemaRegistryTest { + private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081"; + + public static void main(String[] args) { + System.out.println("================================================================================"); + System.out.println("Schema Registry Test - Verifying In-Memory Read Optimization"); + System.out.println("================================================================================\n"); + + SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 100); + boolean allTestsPassed = true; + + try { + // Test 1: Register first schema + System.out.println("Test 1: Registering first schema (user-value)..."); + Schema userValueSchema = SchemaBuilder + .record("User").fields() + .requiredString("name") + .requiredInt("age") + .endRecord(); + + long startTime = System.currentTimeMillis(); + int schema1Id = schemaRegistry.register("user-value", userValueSchema); + long elapsedTime = System.currentTimeMillis() - startTime; + System.out.println("✓ SUCCESS: Schema registered with ID: " + schema1Id + " (took " + elapsedTime + "ms)"); + + // Test 2: Register second schema immediately (tests read-after-write) + System.out.println("\nTest 2: Registering second schema immediately (user-key)..."); + Schema userKeySchema = SchemaBuilder + .record("UserKey").fields() + .requiredString("userId") + .endRecord(); + + startTime = System.currentTimeMillis(); + int schema2Id = schemaRegistry.register("user-key", userKeySchema); + elapsedTime = System.currentTimeMillis() - startTime; + System.out.println("✓ SUCCESS: Schema registered with ID: " + schema2Id + " (took " + elapsedTime + "ms)"); + + // Test 3: Rapid fire registrations (tests concurrent writes) + System.out.println("\nTest 3: Rapid fire registrations (10 schemas in parallel)..."); + startTime = System.currentTimeMillis(); + Thread[] threads = new Thread[10]; + final boolean[] results = new boolean[10]; + + for (int i = 0; i < 10; i++) { + final int index = i; + threads[i] = new Thread(() -> { + try { + Schema schema = SchemaBuilder + .record("Test" + index).fields() + .requiredString("field" + index) + .endRecord(); + schemaRegistry.register("test-" + index + "-value", schema); + results[index] = true; + } catch (Exception e) { + System.err.println("✗ ERROR in thread " + index + ": " + e.getMessage()); + results[index] = false; + } + }); + threads[i].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + elapsedTime = System.currentTimeMillis() - startTime; + int successCount = 0; + for (boolean result : results) { + if (result) successCount++; + } + + if (successCount == 10) { + System.out.println("✓ SUCCESS: All 10 schemas registered (took " + elapsedTime + "ms total, ~" + (elapsedTime / 10) + "ms per schema)"); + } else { + System.out.println("✗ PARTIAL FAILURE: Only " + successCount + "/10 schemas registered"); + allTestsPassed = false; + } + + // Test 4: Verify we can retrieve all schemas + System.out.println("\nTest 4: Verifying all schemas are retrievable..."); + startTime = System.currentTimeMillis(); + Schema retrieved1 = schemaRegistry.getById(schema1Id); + Schema retrieved2 = schemaRegistry.getById(schema2Id); + elapsedTime = System.currentTimeMillis() - startTime; + + if (retrieved1.equals(userValueSchema) && retrieved2.equals(userKeySchema)) { + System.out.println("✓ SUCCESS: All schemas retrieved correctly (took " + elapsedTime + "ms)"); + } else { + System.out.println("✗ FAILURE: Schema mismatch"); + allTestsPassed = false; + } + + // Summary + System.out.println("\n==============================================================================="); + if (allTestsPassed) { + System.out.println("✓ ALL TESTS PASSED!"); + System.out.println("==============================================================================="); + System.out.println("\nOptimization verified:"); + System.out.println("- ForceFlush is NO LONGER NEEDED"); + System.out.println("- Subscribers read from in-memory buffer using IsOffsetInMemory()"); + System.out.println("- Per-subscriber notification channels provide instant wake-up"); + System.out.println("- True concurrent writes without serialization"); + System.exit(0); + } else { + System.out.println("✗ SOME TESTS FAILED"); + System.out.println("==============================================================================="); + System.exit(1); + } + + } catch (Exception e) { + System.err.println("\n✗ FATAL ERROR: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } +} + diff --git a/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java b/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java new file mode 100644 index 000000000..f334c045a --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java @@ -0,0 +1,78 @@ +import java.net.*; +import java.nio.*; +import java.nio.channels.*; + +public class TestSocketReadiness { + public static void main(String[] args) throws Exception { + String host = args.length > 0 ? args[0] : "localhost"; + int port = args.length > 1 ? Integer.parseInt(args[1]) : 9093; + + System.out.println("Testing socket readiness with " + host + ":" + port); + + // Test 1: Simple blocking connect + System.out.println("\n=== Test 1: Blocking Socket ==="); + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(host, port), 5000); + System.out.println("Blocking socket connected"); + System.out.println(" Available bytes: " + socket.getInputStream().available()); + Thread.sleep(100); + System.out.println(" Available bytes after 100ms: " + socket.getInputStream().available()); + } catch (Exception e) { + System.err.println("Blocking socket failed: " + e.getMessage()); + } + + // Test 2: Non-blocking NIO socket (like Kafka client uses) + System.out.println("\n=== Test 2: Non-blocking NIO Socket ==="); + Selector selector = Selector.open(); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + + try { + boolean connected = channel.connect(new InetSocketAddress(host, port)); + System.out.println(" connect() returned: " + connected); + + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); + + int ready = selector.select(5000); + System.out.println(" selector.select() returned: " + ready); + + if (ready > 0) { + for (SelectionKey k : selector.selectedKeys()) { + if (k.isConnectable()) { + System.out.println(" isConnectable: true"); + boolean finished = channel.finishConnect(); + System.out.println(" finishConnect() returned: " + finished); + + if (finished) { + k.interestOps(SelectionKey.OP_READ); + + // Now check if immediately readable (THIS is what might be wrong) + selector.selectedKeys().clear(); + int readReady = selector.selectNow(); + System.out.println(" Immediately after connect, selectNow() = " + readReady); + + if (readReady > 0) { + System.out.println(" Socket is IMMEDIATELY readable (unexpected!)"); + ByteBuffer buf = ByteBuffer.allocate(1); + int bytesRead = channel.read(buf); + System.out.println(" read() returned: " + bytesRead); + } else { + System.out.println(" Socket is NOT immediately readable (correct)"); + } + } + } + } + } + + System.out.println("NIO socket test completed"); + } catch (Exception e) { + System.err.println("NIO socket failed: " + e.getMessage()); + e.printStackTrace(); + } finally { + channel.close(); + selector.close(); + } + + System.out.println("\nAll tests completed"); + } +} diff --git a/test/kafka/kafka-client-loadtest/tools/go.mod b/test/kafka/kafka-client-loadtest/tools/go.mod new file mode 100644 index 000000000..c63d94230 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/go.mod @@ -0,0 +1,10 @@ +module simple-test + +go 1.24.7 + +require github.com/segmentio/kafka-go v0.4.49 + +require ( + github.com/klauspost/compress v1.15.9 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect +) diff --git a/test/kafka/kafka-client-loadtest/tools/go.sum b/test/kafka/kafka-client-loadtest/tools/go.sum new file mode 100644 index 000000000..74b476c2d --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/go.sum @@ -0,0 +1,24 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= +github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go new file mode 100644 index 000000000..1da40c89f --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "log" + "os" + "time" + + "github.com/segmentio/kafka-go" +) + +func main() { + if len(os.Args) < 3 { + log.Fatal("Usage: kafka-go-consumer <broker> <topic>") + } + broker := os.Args[1] + topic := os.Args[2] + + log.Printf("Connecting to Kafka broker: %s", broker) + log.Printf("Topic: %s", topic) + + // Create a new reader + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{broker}, + Topic: topic, + GroupID: "kafka-go-test-group", + MinBytes: 1, + MaxBytes: 10e6, // 10MB + MaxWait: 1 * time.Second, + }) + defer r.Close() + + log.Printf("Starting to consume messages...") + + ctx := context.Background() + messageCount := 0 + errorCount := 0 + startTime := time.Now() + + for { + m, err := r.ReadMessage(ctx) + if err != nil { + errorCount++ + log.Printf("Error reading message #%d: %v", messageCount+1, err) + + // Stop after 10 consecutive errors or 60 seconds + if errorCount > 10 || time.Since(startTime) > 60*time.Second { + log.Printf("\nStopping after %d errors in %v", errorCount, time.Since(startTime)) + break + } + continue + } + + // Reset error count on successful read + errorCount = 0 + messageCount++ + + log.Printf("Message #%d: topic=%s partition=%d offset=%d key=%s value=%s", + messageCount, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) + + // Stop after 100 messages or 60 seconds + if messageCount >= 100 || time.Since(startTime) > 60*time.Second { + log.Printf("\nSuccessfully consumed %d messages in %v", messageCount, time.Since(startTime)) + log.Printf("Success rate: %.1f%% (%d/%d including errors)", + float64(messageCount)/float64(messageCount+errorCount)*100, messageCount, messageCount+errorCount) + break + } + } +} diff --git a/test/kafka/kafka-client-loadtest/tools/log4j.properties b/test/kafka/kafka-client-loadtest/tools/log4j.properties new file mode 100644 index 000000000..ed0cd0fe5 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/log4j.properties @@ -0,0 +1,12 @@ +log4j.rootLogger=DEBUG, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c: %m%n + +# More verbose for Kafka client +log4j.logger.org.apache.kafka=DEBUG +log4j.logger.org.apache.kafka.clients=TRACE +log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE + + diff --git a/test/kafka/kafka-client-loadtest/tools/pom.xml b/test/kafka/kafka-client-loadtest/tools/pom.xml new file mode 100644 index 000000000..58a858e95 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/pom.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.seaweedfs.test</groupId> + <artifactId>kafka-consumer-test</artifactId> + <version>1.0-SNAPSHOT</version> + + <properties> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + <kafka.version>3.9.1</kafka.version> + <confluent.version>7.6.0</confluent.version> + </properties> + + <repositories> + <repository> + <id>confluent</id> + <url>https://packages.confluent.io/maven/</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>io.confluent</groupId> + <artifactId>kafka-schema-registry-client</artifactId> + <version>${confluent.version}</version> + </dependency> + <dependency> + <groupId>io.confluent</groupId> + <artifactId>kafka-avro-serializer</artifactId> + <version>${confluent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.11.4</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>2.0.9</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.11.0</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>3.1.0</version> + <configuration> + <mainClass>tools.SchemaRegistryTest</mainClass> + </configuration> + </plugin> + </plugins> + </build> +</project> + + diff --git a/test/kafka/kafka-client-loadtest/tools/simple-test b/test/kafka/kafka-client-loadtest/tools/simple-test Binary files differnew file mode 100755 index 000000000..47eef7386 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/tools/simple-test diff --git a/test/kafka/kafka-client-loadtest/verify_schema_formats.sh b/test/kafka/kafka-client-loadtest/verify_schema_formats.sh new file mode 100755 index 000000000..6ded75b33 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/verify_schema_formats.sh @@ -0,0 +1,63 @@ +#!/bin/bash +# Verify schema format distribution across topics + +set -e + +SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_URL:-http://localhost:8081}" +TOPIC_PREFIX="${TOPIC_PREFIX:-loadtest-topic}" +TOPIC_COUNT="${TOPIC_COUNT:-5}" + +echo "================================" +echo "Schema Format Verification" +echo "================================" +echo "" +echo "Schema Registry: $SCHEMA_REGISTRY_URL" +echo "Topic Prefix: $TOPIC_PREFIX" +echo "Topic Count: $TOPIC_COUNT" +echo "" + +echo "Registered Schemas:" +echo "-------------------" + +for i in $(seq 0 $((TOPIC_COUNT-1))); do + topic="${TOPIC_PREFIX}-${i}" + subject="${topic}-value" + + echo -n "Topic $i ($topic): " + + # Try to get schema + response=$(curl -s "${SCHEMA_REGISTRY_URL}/subjects/${subject}/versions/latest" 2>/dev/null || echo '{"error":"not found"}') + + if echo "$response" | grep -q "error"; then + echo "❌ NOT REGISTERED" + else + schema_type=$(echo "$response" | grep -o '"schemaType":"[^"]*"' | cut -d'"' -f4) + schema_id=$(echo "$response" | grep -o '"id":[0-9]*' | cut -d':' -f2) + + if [ -z "$schema_type" ]; then + schema_type="AVRO" # Default if not specified + fi + + # Expected format based on index + if [ $((i % 2)) -eq 0 ]; then + expected="AVRO" + else + expected="JSON" + fi + + if [ "$schema_type" = "$expected" ]; then + echo "✅ $schema_type (ID: $schema_id) - matches expected" + else + echo "⚠️ $schema_type (ID: $schema_id) - expected $expected" + fi + fi +done + +echo "" +echo "Expected Distribution:" +echo "----------------------" +echo "Even indices (0, 2, 4, ...): AVRO" +echo "Odd indices (1, 3, 5, ...): JSON" +echo "" + + |
