aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest')
-rw-r--r--test/kafka/kafka-client-loadtest/.dockerignore3
-rw-r--r--test/kafka/kafka-client-loadtest/.gitignore63
-rw-r--r--test/kafka/kafka-client-loadtest/Dockerfile.loadtest49
-rw-r--r--test/kafka/kafka-client-loadtest/Dockerfile.seaweedfs37
-rw-r--r--test/kafka/kafka-client-loadtest/Makefile446
-rw-r--r--test/kafka/kafka-client-loadtest/README.md397
-rw-r--r--test/kafka/kafka-client-loadtest/cmd/loadtest/main.go465
-rw-r--r--test/kafka/kafka-client-loadtest/config/loadtest.yaml169
-rw-r--r--test/kafka/kafka-client-loadtest/docker-compose-kafka-compare.yml46
-rw-r--r--test/kafka/kafka-client-loadtest/docker-compose.yml316
-rw-r--r--test/kafka/kafka-client-loadtest/go.mod41
-rw-r--r--test/kafka/kafka-client-loadtest/go.sum129
-rw-r--r--test/kafka/kafka-client-loadtest/internal/config/config.go361
-rw-r--r--test/kafka/kafka-client-loadtest/internal/consumer/consumer.go626
-rw-r--r--test/kafka/kafka-client-loadtest/internal/metrics/collector.go353
-rw-r--r--test/kafka/kafka-client-loadtest/internal/producer/producer.go770
-rw-r--r--test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto16
-rw-r--r--test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go185
-rw-r--r--test/kafka/kafka-client-loadtest/internal/schema/schemas.go58
-rwxr-xr-xtest/kafka/kafka-client-loadtest/loadtestbin0 -> 17649346 bytes
-rw-r--r--test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/kafka-loadtest.json106
-rw-r--r--test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/seaweedfs.json62
-rw-r--r--test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/dashboards/dashboard.yml11
-rw-r--r--test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/datasources/datasource.yml12
-rw-r--r--test/kafka/kafka-client-loadtest/monitoring/prometheus/prometheus.yml54
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/register-schemas.sh423
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/run-loadtest.sh480
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh352
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh151
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/wait-for-services.sh291
-rw-r--r--test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java290
-rw-r--r--test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java72
-rw-r--r--test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java82
-rw-r--r--test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java68
-rw-r--r--test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java124
-rw-r--r--test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java78
-rw-r--r--test/kafka/kafka-client-loadtest/tools/go.mod10
-rw-r--r--test/kafka/kafka-client-loadtest/tools/go.sum24
-rw-r--r--test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go69
-rw-r--r--test/kafka/kafka-client-loadtest/tools/log4j.properties12
-rw-r--r--test/kafka/kafka-client-loadtest/tools/pom.xml72
-rwxr-xr-xtest/kafka/kafka-client-loadtest/tools/simple-testbin0 -> 8617650 bytes
-rwxr-xr-xtest/kafka/kafka-client-loadtest/verify_schema_formats.sh63
43 files changed, 7436 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/.dockerignore b/test/kafka/kafka-client-loadtest/.dockerignore
new file mode 100644
index 000000000..1354ab263
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/.dockerignore
@@ -0,0 +1,3 @@
+# Keep only the Linux binaries
+!weed-linux-amd64
+!weed-linux-arm64
diff --git a/test/kafka/kafka-client-loadtest/.gitignore b/test/kafka/kafka-client-loadtest/.gitignore
new file mode 100644
index 000000000..ef136a5e2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/.gitignore
@@ -0,0 +1,63 @@
+# Binaries
+kafka-loadtest
+*.exe
+*.exe~
+*.dll
+*.so
+*.dylib
+
+# Test binary, built with `go test -c`
+*.test
+
+# Output of the go coverage tool
+*.out
+
+# Go workspace file
+go.work
+
+# Test results and logs
+test-results/
+*.log
+logs/
+
+# Docker volumes and data
+data/
+volumes/
+
+# Monitoring data
+monitoring/prometheus/data/
+monitoring/grafana/data/
+
+# IDE files
+.vscode/
+.idea/
+*.swp
+*.swo
+
+# OS generated files
+.DS_Store
+.DS_Store?
+._*
+.Spotlight-V100
+.Trashes
+ehthumbs.db
+Thumbs.db
+
+# Environment files
+.env
+.env.local
+.env.*.local
+
+# Temporary files
+tmp/
+temp/
+*.tmp
+
+# Coverage reports
+coverage.html
+coverage.out
+
+# Build artifacts
+bin/
+build/
+dist/
diff --git a/test/kafka/kafka-client-loadtest/Dockerfile.loadtest b/test/kafka/kafka-client-loadtest/Dockerfile.loadtest
new file mode 100644
index 000000000..ccf7e5e16
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/Dockerfile.loadtest
@@ -0,0 +1,49 @@
+# Kafka Client Load Test Runner Dockerfile
+# Multi-stage build for cross-platform support
+
+# Stage 1: Builder
+FROM golang:1.24-alpine AS builder
+
+WORKDIR /app
+
+# Copy go module files
+COPY test/kafka/kafka-client-loadtest/go.mod test/kafka/kafka-client-loadtest/go.sum ./
+RUN go mod download
+
+# Copy source code
+COPY test/kafka/kafka-client-loadtest/ ./
+
+# Build the loadtest binary
+RUN CGO_ENABLED=0 GOOS=linux go build -o /kafka-loadtest ./cmd/loadtest
+
+# Stage 2: Runtime
+FROM ubuntu:22.04
+
+# Install runtime dependencies
+RUN apt-get update && apt-get install -y \
+ ca-certificates \
+ curl \
+ jq \
+ bash \
+ netcat \
+ && rm -rf /var/lib/apt/lists/*
+
+# Copy built binary from builder stage
+COPY --from=builder /kafka-loadtest /usr/local/bin/kafka-loadtest
+RUN chmod +x /usr/local/bin/kafka-loadtest
+
+# Copy scripts and configuration
+COPY test/kafka/kafka-client-loadtest/scripts/ /scripts/
+COPY test/kafka/kafka-client-loadtest/config/ /config/
+
+# Create results directory
+RUN mkdir -p /test-results
+
+# Make scripts executable
+RUN chmod +x /scripts/*.sh
+
+WORKDIR /app
+
+# Default command runs the comprehensive load test
+CMD ["/usr/local/bin/kafka-loadtest", "-config", "/config/loadtest.yaml"]
+
diff --git a/test/kafka/kafka-client-loadtest/Dockerfile.seaweedfs b/test/kafka/kafka-client-loadtest/Dockerfile.seaweedfs
new file mode 100644
index 000000000..cde2e3df1
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/Dockerfile.seaweedfs
@@ -0,0 +1,37 @@
+# SeaweedFS Runtime Dockerfile for Kafka Client Load Tests
+# Optimized for fast builds - binary built locally and copied in
+FROM alpine:3.18
+
+# Install runtime dependencies
+RUN apk add --no-cache \
+ ca-certificates \
+ wget \
+ netcat-openbsd \
+ curl \
+ tzdata \
+ && rm -rf /var/cache/apk/*
+
+# Copy pre-built SeaweedFS binary (built locally for linux/amd64 or linux/arm64)
+# Cache-busting: Use build arg to force layer rebuild on every build
+ARG TARGETARCH=arm64
+ARG CACHE_BUST=unknown
+RUN echo "Building with cache bust: ${CACHE_BUST}"
+COPY weed-linux-${TARGETARCH} /usr/local/bin/weed
+RUN chmod +x /usr/local/bin/weed
+
+# Create data directory
+RUN mkdir -p /data
+
+# Set timezone
+ENV TZ=UTC
+
+# Health check script
+RUN echo '#!/bin/sh' > /usr/local/bin/health-check && \
+ echo 'exec "$@"' >> /usr/local/bin/health-check && \
+ chmod +x /usr/local/bin/health-check
+
+VOLUME ["/data"]
+WORKDIR /data
+
+ENTRYPOINT ["/usr/local/bin/weed"]
+
diff --git a/test/kafka/kafka-client-loadtest/Makefile b/test/kafka/kafka-client-loadtest/Makefile
new file mode 100644
index 000000000..362b5c680
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/Makefile
@@ -0,0 +1,446 @@
+# Kafka Client Load Test Makefile
+# Provides convenient targets for running load tests against SeaweedFS Kafka Gateway
+
+.PHONY: help build start stop restart clean test quick-test stress-test endurance-test monitor logs status
+
+# Configuration
+DOCKER_COMPOSE := docker compose
+PROJECT_NAME := kafka-client-loadtest
+CONFIG_FILE := config/loadtest.yaml
+
+# Build configuration
+GOARCH ?= arm64
+GOOS ?= linux
+
+# Default test parameters
+TEST_MODE ?= comprehensive
+TEST_DURATION ?= 300s
+PRODUCER_COUNT ?= 10
+CONSUMER_COUNT ?= 5
+MESSAGE_RATE ?= 1000
+MESSAGE_SIZE ?= 1024
+
+# Colors for output
+GREEN := \033[0;32m
+YELLOW := \033[0;33m
+BLUE := \033[0;34m
+NC := \033[0m
+
+help: ## Show this help message
+ @echo "Kafka Client Load Test Makefile"
+ @echo ""
+ @echo "Available targets:"
+ @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf " $(BLUE)%-20s$(NC) %s\n", $$1, $$2}' $(MAKEFILE_LIST)
+ @echo ""
+ @echo "Environment variables:"
+ @echo " TEST_MODE Test mode: producer, consumer, comprehensive (default: comprehensive)"
+ @echo " TEST_DURATION Test duration (default: 300s)"
+ @echo " PRODUCER_COUNT Number of producers (default: 10)"
+ @echo " CONSUMER_COUNT Number of consumers (default: 5)"
+ @echo " MESSAGE_RATE Messages per second per producer (default: 1000)"
+ @echo " MESSAGE_SIZE Message size in bytes (default: 1024)"
+ @echo ""
+ @echo "Examples:"
+ @echo " make test # Run default comprehensive test"
+ @echo " make test TEST_DURATION=10m # Run 10-minute test"
+ @echo " make quick-test # Run quick smoke test (rebuilds gateway)"
+ @echo " make stress-test # Run high-load stress test"
+ @echo " make test TEST_MODE=producer # Producer-only test"
+ @echo " make schema-test # Run schema integration test with Schema Registry"
+ @echo " make schema-quick-test # Run quick schema test (30s timeout)"
+ @echo " make schema-loadtest # Run load test with schemas enabled"
+ @echo " make build-binary # Build SeaweedFS binary locally for Linux"
+ @echo " make build-gateway # Build Kafka Gateway (builds binary + Docker image)"
+ @echo " make build-gateway-clean # Build Kafka Gateway with no cache (fresh build)"
+
+build: ## Build the load test application
+ @echo "$(BLUE)Building load test application...$(NC)"
+ $(DOCKER_COMPOSE) build kafka-client-loadtest
+ @echo "$(GREEN)Build completed$(NC)"
+
+build-binary: ## Build the SeaweedFS binary locally for Linux
+ @echo "$(BLUE)Building SeaweedFS binary locally for $(GOOS) $(GOARCH)...$(NC)"
+ cd ../../.. && \
+ CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \
+ -ldflags="-s -w" \
+ -tags "5BytesOffset" \
+ -o test/kafka/kafka-client-loadtest/weed-$(GOOS)-$(GOARCH) \
+ weed/weed.go
+ @echo "$(GREEN)Binary build completed: weed-$(GOOS)-$(GOARCH)$(NC)"
+
+build-gateway: build-binary ## Build the Kafka Gateway with latest changes
+ @echo "$(BLUE)Building Kafka Gateway Docker image...$(NC)"
+ CACHE_BUST=$$(date +%s) $(DOCKER_COMPOSE) build kafka-gateway
+ @echo "$(GREEN)Kafka Gateway build completed$(NC)"
+
+build-gateway-clean: build-binary ## Build the Kafka Gateway with no cache (force fresh build)
+ @echo "$(BLUE)Building Kafka Gateway Docker image with no cache...$(NC)"
+ $(DOCKER_COMPOSE) build --no-cache kafka-gateway
+ @echo "$(GREEN)Kafka Gateway clean build completed$(NC)"
+
+setup: ## Set up monitoring and configuration
+ @echo "$(BLUE)Setting up monitoring configuration...$(NC)"
+ ./scripts/setup-monitoring.sh
+ @echo "$(GREEN)Setup completed$(NC)"
+
+start: build-gateway ## Start the infrastructure services (without load test)
+ @echo "$(BLUE)Starting SeaweedFS infrastructure...$(NC)"
+ $(DOCKER_COMPOSE) up -d \
+ seaweedfs-master \
+ seaweedfs-volume \
+ seaweedfs-filer \
+ seaweedfs-mq-broker \
+ kafka-gateway \
+ schema-registry-init \
+ schema-registry
+ @echo "$(GREEN)Infrastructure started$(NC)"
+ @echo "Waiting for services to be ready..."
+ ./scripts/wait-for-services.sh wait
+ @echo "$(GREEN)All services are ready!$(NC)"
+
+stop: ## Stop all services
+ @echo "$(BLUE)Stopping all services...$(NC)"
+ $(DOCKER_COMPOSE) --profile loadtest --profile monitoring down
+ @echo "$(GREEN)Services stopped$(NC)"
+
+restart: stop start ## Restart all services
+
+clean: ## Clean up all resources (containers, volumes, networks, local data)
+ @echo "$(YELLOW)Warning: This will remove all volumes and data!$(NC)"
+ @echo "Press Ctrl+C to cancel, or wait 5 seconds to continue..."
+ @sleep 5
+ @echo "$(BLUE)Cleaning up all resources...$(NC)"
+ $(DOCKER_COMPOSE) --profile loadtest --profile monitoring down -v --remove-orphans
+ docker system prune -f
+ @if [ -f "weed-linux-arm64" ]; then \
+ echo "$(BLUE)Removing local binary...$(NC)"; \
+ rm -f weed-linux-arm64; \
+ fi
+ @if [ -d "data" ]; then \
+ echo "$(BLUE)Removing ALL local data directories (including offset state)...$(NC)"; \
+ rm -rf data/*; \
+ fi
+ @echo "$(GREEN)Cleanup completed - all data removed$(NC)"
+
+clean-binary: ## Clean up only the local binary
+ @echo "$(BLUE)Removing local binary...$(NC)"
+ @rm -f weed-linux-arm64
+ @echo "$(GREEN)Binary cleanup completed$(NC)"
+
+status: ## Show service status
+ @echo "$(BLUE)Service Status:$(NC)"
+ $(DOCKER_COMPOSE) ps
+
+logs: ## Show logs from all services
+ $(DOCKER_COMPOSE) logs -f
+
+test: start ## Run the comprehensive load test
+ @echo "$(BLUE)Running Kafka client load test...$(NC)"
+ @echo "Mode: $(TEST_MODE), Duration: $(TEST_DURATION)"
+ @echo "Producers: $(PRODUCER_COUNT), Consumers: $(CONSUMER_COUNT)"
+ @echo "Message Rate: $(MESSAGE_RATE) msgs/sec, Size: $(MESSAGE_SIZE) bytes"
+ @echo ""
+ @docker rm -f kafka-client-loadtest-runner 2>/dev/null || true
+ TEST_MODE=$(TEST_MODE) TEST_DURATION=$(TEST_DURATION) PRODUCER_COUNT=$(PRODUCER_COUNT) CONSUMER_COUNT=$(CONSUMER_COUNT) MESSAGE_RATE=$(MESSAGE_RATE) MESSAGE_SIZE=$(MESSAGE_SIZE) VALUE_TYPE=$(VALUE_TYPE) $(DOCKER_COMPOSE) --profile loadtest up --abort-on-container-exit kafka-client-loadtest
+ @echo "$(GREEN)Load test completed!$(NC)"
+ @$(MAKE) show-results
+
+quick-test: build-gateway ## Run a quick smoke test (1 min, low load, WITH schemas)
+ @echo "$(BLUE)================================================================$(NC)"
+ @echo "$(BLUE) Quick Test (Low Load, WITH Schema Registry + Avro) $(NC)"
+ @echo "$(BLUE) - Duration: 1 minute $(NC)"
+ @echo "$(BLUE) - Load: 1 producer × 10 msg/sec = 10 total msg/sec $(NC)"
+ @echo "$(BLUE) - Message Type: Avro (with schema encoding) $(NC)"
+ @echo "$(BLUE) - Schema-First: Registers schemas BEFORE producing $(NC)"
+ @echo "$(BLUE)================================================================$(NC)"
+ @echo ""
+ @$(MAKE) start
+ @echo ""
+ @echo "$(BLUE)=== Step 1: Registering schemas in Schema Registry ===$(NC)"
+ @echo "$(YELLOW)[WARN] IMPORTANT: Schemas MUST be registered before producing Avro messages!$(NC)"
+ @./scripts/register-schemas.sh full
+ @echo "$(GREEN)- Schemas registered successfully$(NC)"
+ @echo ""
+ @echo "$(BLUE)=== Step 2: Running load test with Avro messages ===$(NC)"
+ @$(MAKE) test \
+ TEST_MODE=comprehensive \
+ TEST_DURATION=60s \
+ PRODUCER_COUNT=1 \
+ CONSUMER_COUNT=1 \
+ MESSAGE_RATE=10 \
+ MESSAGE_SIZE=256 \
+ VALUE_TYPE=avro
+ @echo ""
+ @echo "$(GREEN)================================================================$(NC)"
+ @echo "$(GREEN) Quick Test Complete! $(NC)"
+ @echo "$(GREEN) - Schema Registration $(NC)"
+ @echo "$(GREEN) - Avro Message Production $(NC)"
+ @echo "$(GREEN) - Message Consumption $(NC)"
+ @echo "$(GREEN)================================================================$(NC)"
+
+standard-test: ## Run a standard load test (2 min, medium load, WITH Schema Registry + Avro)
+ @echo "$(BLUE)================================================================$(NC)"
+ @echo "$(BLUE) Standard Test (Medium Load, WITH Schema Registry) $(NC)"
+ @echo "$(BLUE) - Duration: 2 minutes $(NC)"
+ @echo "$(BLUE) - Load: 2 producers × 50 msg/sec = 100 total msg/sec $(NC)"
+ @echo "$(BLUE) - Message Type: Avro (with schema encoding) $(NC)"
+ @echo "$(BLUE) - IMPORTANT: Schemas registered FIRST in Schema Registry $(NC)"
+ @echo "$(BLUE)================================================================$(NC)"
+ @echo ""
+ @$(MAKE) start
+ @echo ""
+ @echo "$(BLUE)=== Step 1: Registering schemas in Schema Registry ===$(NC)"
+ @echo "$(YELLOW)Note: Schemas MUST be registered before producing Avro messages!$(NC)"
+ @./scripts/register-schemas.sh full
+ @echo "$(GREEN)- Schemas registered$(NC)"
+ @echo ""
+ @echo "$(BLUE)=== Step 2: Running load test with Avro messages ===$(NC)"
+ @$(MAKE) test \
+ TEST_MODE=comprehensive \
+ TEST_DURATION=2m \
+ PRODUCER_COUNT=2 \
+ CONSUMER_COUNT=2 \
+ MESSAGE_RATE=50 \
+ MESSAGE_SIZE=512 \
+ VALUE_TYPE=avro
+ @echo ""
+ @echo "$(GREEN)================================================================$(NC)"
+ @echo "$(GREEN) Standard Test Complete! $(NC)"
+ @echo "$(GREEN)================================================================$(NC)"
+
+stress-test: ## Run a stress test (10 minutes, high load) with schemas
+ @echo "$(BLUE)Starting stress test with schema registration...$(NC)"
+ @$(MAKE) start
+ @echo "$(BLUE)Registering schemas with Schema Registry...$(NC)"
+ @./scripts/register-schemas.sh full
+ @echo "$(BLUE)Running stress test with registered schemas...$(NC)"
+ @$(MAKE) test \
+ TEST_MODE=comprehensive \
+ TEST_DURATION=10m \
+ PRODUCER_COUNT=20 \
+ CONSUMER_COUNT=10 \
+ MESSAGE_RATE=2000 \
+ MESSAGE_SIZE=2048 \
+ VALUE_TYPE=avro
+
+endurance-test: ## Run an endurance test (30 minutes, sustained load) with schemas
+ @echo "$(BLUE)Starting endurance test with schema registration...$(NC)"
+ @$(MAKE) start
+ @echo "$(BLUE)Registering schemas with Schema Registry...$(NC)"
+ @./scripts/register-schemas.sh full
+ @echo "$(BLUE)Running endurance test with registered schemas...$(NC)"
+ @$(MAKE) test \
+ TEST_MODE=comprehensive \
+ TEST_DURATION=30m \
+ PRODUCER_COUNT=10 \
+ CONSUMER_COUNT=5 \
+ MESSAGE_RATE=1000 \
+ MESSAGE_SIZE=1024 \
+ VALUE_TYPE=avro
+
+producer-test: ## Run producer-only load test
+ @$(MAKE) test TEST_MODE=producer
+
+consumer-test: ## Run consumer-only load test (requires existing messages)
+ @$(MAKE) test TEST_MODE=consumer
+
+register-schemas: start ## Register schemas with Schema Registry
+ @echo "$(BLUE)Registering schemas with Schema Registry...$(NC)"
+ @./scripts/register-schemas.sh full
+ @echo "$(GREEN)Schema registration completed!$(NC)"
+
+verify-schemas: ## Verify schemas are registered in Schema Registry
+ @echo "$(BLUE)Verifying schemas in Schema Registry...$(NC)"
+ @./scripts/register-schemas.sh verify
+ @echo "$(GREEN)Schema verification completed!$(NC)"
+
+list-schemas: ## List all registered schemas in Schema Registry
+ @echo "$(BLUE)Listing registered schemas...$(NC)"
+ @./scripts/register-schemas.sh list
+
+cleanup-schemas: ## Clean up test schemas from Schema Registry
+ @echo "$(YELLOW)Cleaning up test schemas...$(NC)"
+ @./scripts/register-schemas.sh cleanup
+ @echo "$(GREEN)Schema cleanup completed!$(NC)"
+
+schema-test: start ## Run schema integration test (with Schema Registry)
+ @echo "$(BLUE)Running schema integration test...$(NC)"
+ @echo "Testing Schema Registry integration with schematized topics"
+ @echo ""
+ CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o schema-test-linux test_schema_integration.go
+ docker run --rm --network kafka-client-loadtest \
+ -v $(PWD)/schema-test-linux:/usr/local/bin/schema-test \
+ alpine:3.18 /usr/local/bin/schema-test
+ @rm -f schema-test-linux
+ @echo "$(GREEN)Schema integration test completed!$(NC)"
+
+schema-quick-test: start ## Run quick schema test (lighter version)
+ @echo "$(BLUE)Running quick schema test...$(NC)"
+ @echo "Testing basic schema functionality"
+ @echo ""
+ CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o schema-test-linux test_schema_integration.go
+ timeout 60s docker run --rm --network kafka-client-loadtest \
+ -v $(PWD)/schema-test-linux:/usr/local/bin/schema-test \
+ alpine:3.18 /usr/local/bin/schema-test || true
+ @rm -f schema-test-linux
+ @echo "$(GREEN)Quick schema test completed!$(NC)"
+
+simple-schema-test: start ## Run simple schema test (step-by-step)
+ @echo "$(BLUE)Running simple schema test...$(NC)"
+ @echo "Step-by-step schema functionality test"
+ @echo ""
+ @mkdir -p simple-test
+ @cp simple_schema_test.go simple-test/main.go
+ cd simple-test && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ../simple-schema-test-linux .
+ docker run --rm --network kafka-client-loadtest \
+ -v $(PWD)/simple-schema-test-linux:/usr/local/bin/simple-schema-test \
+ alpine:3.18 /usr/local/bin/simple-schema-test
+ @rm -f simple-schema-test-linux
+ @rm -rf simple-test
+ @echo "$(GREEN)Simple schema test completed!$(NC)"
+
+basic-schema-test: start ## Run basic schema test (manual schema handling without Schema Registry)
+ @echo "$(BLUE)Running basic schema test...$(NC)"
+ @echo "Testing schema functionality without Schema Registry dependency"
+ @echo ""
+ @mkdir -p basic-test
+ @cp basic_schema_test.go basic-test/main.go
+ cd basic-test && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ../basic-schema-test-linux .
+ timeout 60s docker run --rm --network kafka-client-loadtest \
+ -v $(PWD)/basic-schema-test-linux:/usr/local/bin/basic-schema-test \
+ alpine:3.18 /usr/local/bin/basic-schema-test
+ @rm -f basic-schema-test-linux
+ @rm -rf basic-test
+ @echo "$(GREEN)Basic schema test completed!$(NC)"
+
+schema-loadtest: start ## Run load test with schemas enabled
+ @echo "$(BLUE)Running schema-enabled load test...$(NC)"
+ @echo "Mode: comprehensive with schemas, Duration: 3m"
+ @echo "Producers: 3, Consumers: 2, Message Rate: 50 msgs/sec"
+ @echo ""
+ TEST_MODE=comprehensive \
+ TEST_DURATION=3m \
+ PRODUCER_COUNT=3 \
+ CONSUMER_COUNT=2 \
+ MESSAGE_RATE=50 \
+ MESSAGE_SIZE=1024 \
+ SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
+ $(DOCKER_COMPOSE) --profile loadtest up --abort-on-container-exit kafka-client-loadtest
+ @echo "$(GREEN)Schema load test completed!$(NC)"
+ @$(MAKE) show-results
+
+monitor: setup ## Start monitoring stack (Prometheus + Grafana)
+ @echo "$(BLUE)Starting monitoring stack...$(NC)"
+ $(DOCKER_COMPOSE) --profile monitoring up -d prometheus grafana
+ @echo "$(GREEN)Monitoring stack started!$(NC)"
+ @echo ""
+ @echo "Access points:"
+ @echo " Prometheus: http://localhost:9090"
+ @echo " Grafana: http://localhost:3000 (admin/admin)"
+
+monitor-stop: ## Stop monitoring stack
+ @echo "$(BLUE)Stopping monitoring stack...$(NC)"
+ $(DOCKER_COMPOSE) --profile monitoring stop prometheus grafana
+ @echo "$(GREEN)Monitoring stack stopped$(NC)"
+
+test-with-monitoring: monitor start ## Run test with monitoring enabled
+ @echo "$(BLUE)Running load test with monitoring...$(NC)"
+ @$(MAKE) test
+ @echo ""
+ @echo "$(GREEN)Test completed! Check the monitoring dashboards:$(NC)"
+ @echo " Prometheus: http://localhost:9090"
+ @echo " Grafana: http://localhost:3000 (admin/admin)"
+
+show-results: ## Show test results
+ @echo "$(BLUE)Test Results Summary:$(NC)"
+ @if $(DOCKER_COMPOSE) ps -q kafka-client-loadtest-runner >/dev/null 2>&1; then \
+ $(DOCKER_COMPOSE) exec -T kafka-client-loadtest-runner curl -s http://localhost:8080/stats 2>/dev/null || echo "Results not available"; \
+ else \
+ echo "Load test container not running"; \
+ fi
+ @echo ""
+ @if [ -d "test-results" ]; then \
+ echo "Detailed results saved to: test-results/"; \
+ ls -la test-results/ 2>/dev/null || true; \
+ fi
+
+health-check: ## Check health of all services
+ @echo "$(BLUE)Checking service health...$(NC)"
+ ./scripts/wait-for-services.sh check
+
+validate-setup: ## Validate the test setup
+ @echo "$(BLUE)Validating test setup...$(NC)"
+ @echo "Checking Docker and Docker Compose..."
+ @docker --version
+ @docker compose version || docker-compose --version
+ @echo ""
+ @echo "Checking configuration file..."
+ @if [ -f "$(CONFIG_FILE)" ]; then \
+ echo "- Configuration file exists: $(CONFIG_FILE)"; \
+ else \
+ echo "x Configuration file not found: $(CONFIG_FILE)"; \
+ exit 1; \
+ fi
+ @echo ""
+ @echo "Checking scripts..."
+ @for script in scripts/*.sh; do \
+ if [ -x "$$script" ]; then \
+ echo "- $$script is executable"; \
+ else \
+ echo "x $$script is not executable"; \
+ fi; \
+ done
+ @echo "$(GREEN)Setup validation completed$(NC)"
+
+dev-env: ## Set up development environment
+ @echo "$(BLUE)Setting up development environment...$(NC)"
+ @echo "Installing Go dependencies..."
+ go mod download
+ go mod tidy
+ @echo "$(GREEN)Development environment ready$(NC)"
+
+benchmark: ## Run comprehensive benchmarking suite
+ @echo "$(BLUE)Running comprehensive benchmark suite...$(NC)"
+ @echo "This will run multiple test scenarios and collect detailed metrics"
+ @echo ""
+ @$(MAKE) quick-test
+ @sleep 10
+ @$(MAKE) standard-test
+ @sleep 10
+ @$(MAKE) stress-test
+ @echo "$(GREEN)Benchmark suite completed!$(NC)"
+
+# Advanced targets
+debug: ## Start services in debug mode with verbose logging
+ @echo "$(BLUE)Starting services in debug mode...$(NC)"
+ SEAWEEDFS_LOG_LEVEL=debug \
+ KAFKA_LOG_LEVEL=debug \
+ $(DOCKER_COMPOSE) up \
+ seaweedfs-master \
+ seaweedfs-volume \
+ seaweedfs-filer \
+ seaweedfs-mq-broker \
+ kafka-gateway \
+ schema-registry
+
+attach-loadtest: ## Attach to running load test container
+ $(DOCKER_COMPOSE) exec kafka-client-loadtest-runner /bin/sh
+
+exec-master: ## Execute shell in SeaweedFS master container
+ $(DOCKER_COMPOSE) exec seaweedfs-master /bin/sh
+
+exec-filer: ## Execute shell in SeaweedFS filer container
+ $(DOCKER_COMPOSE) exec seaweedfs-filer /bin/sh
+
+exec-gateway: ## Execute shell in Kafka gateway container
+ $(DOCKER_COMPOSE) exec kafka-gateway /bin/sh
+
+# Utility targets
+ps: status ## Alias for status
+
+up: start ## Alias for start
+
+down: stop ## Alias for stop
+
+# Help is the default target
+.DEFAULT_GOAL := help
diff --git a/test/kafka/kafka-client-loadtest/README.md b/test/kafka/kafka-client-loadtest/README.md
new file mode 100644
index 000000000..4f465a21b
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/README.md
@@ -0,0 +1,397 @@
+# Kafka Client Load Test for SeaweedFS
+
+This comprehensive load testing suite validates the SeaweedFS MQ stack using real Kafka client libraries. Unlike the existing SMQ tests, this uses actual Kafka clients (`sarama` and `confluent-kafka-go`) to test the complete integration through:
+
+- **Kafka Clients** → **SeaweedFS Kafka Gateway** → **SeaweedFS MQ Broker** → **SeaweedFS Storage**
+
+## Architecture
+
+```
+┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
+│ Kafka Client │ │ Kafka Gateway │ │ SeaweedFS MQ │
+│ Load Test │───▶│ (Port 9093) │───▶│ Broker │
+│ - Producers │ │ │ │ │
+│ - Consumers │ │ Protocol │ │ Topic Management │
+│ │ │ Translation │ │ Message Storage │
+└─────────────────┘ └──────────────────┘ └─────────────────────┘
+ │
+ ▼
+ ┌─────────────────────┐
+ │ SeaweedFS Storage │
+ │ - Master │
+ │ - Volume Server │
+ │ - Filer │
+ └─────────────────────┘
+```
+
+## Features
+
+### 🚀 **Multiple Test Modes**
+- **Producer-only**: Pure message production testing
+- **Consumer-only**: Consumption from existing topics
+- **Comprehensive**: Full producer + consumer load testing
+
+### 📊 **Rich Metrics & Monitoring**
+- Prometheus metrics collection
+- Grafana dashboards
+- Real-time throughput and latency tracking
+- Consumer lag monitoring
+- Error rate analysis
+
+### 🔧 **Configurable Test Scenarios**
+- **Quick Test**: 1-minute smoke test
+- **Standard Test**: 5-minute medium load
+- **Stress Test**: 10-minute high load
+- **Endurance Test**: 30-minute sustained load
+- **Custom**: Fully configurable parameters
+
+### 📈 **Message Types**
+- **JSON**: Structured test messages
+- **Avro**: Schema Registry integration
+- **Binary**: Raw binary payloads
+
+### 🛠 **Kafka Client Support**
+- **Sarama**: Native Go Kafka client
+- **Confluent**: Official Confluent Go client
+- Schema Registry integration
+- Consumer group management
+
+## Quick Start
+
+### Prerequisites
+- Docker & Docker Compose
+- Make (optional, but recommended)
+
+### 1. Run Default Test
+```bash
+make test
+```
+This runs a 5-minute comprehensive test with 10 producers and 5 consumers.
+
+### 2. Quick Smoke Test
+```bash
+make quick-test
+```
+1-minute test with minimal load for validation.
+
+### 3. Stress Test
+```bash
+make stress-test
+```
+10-minute high-throughput test with 20 producers and 10 consumers.
+
+### 4. Test with Monitoring
+```bash
+make test-with-monitoring
+```
+Includes Prometheus + Grafana dashboards for real-time monitoring.
+
+## Detailed Usage
+
+### Manual Control
+```bash
+# Start infrastructure only
+make start
+
+# Run load test against running infrastructure
+make test TEST_MODE=comprehensive TEST_DURATION=10m
+
+# Stop everything
+make stop
+
+# Clean up all resources
+make clean
+```
+
+### Using Scripts Directly
+```bash
+# Full control with the main script
+./scripts/run-loadtest.sh start -m comprehensive -d 10m --monitoring
+
+# Check service health
+./scripts/wait-for-services.sh check
+
+# Setup monitoring configurations
+./scripts/setup-monitoring.sh
+```
+
+### Environment Variables
+```bash
+export TEST_MODE=comprehensive # producer, consumer, comprehensive
+export TEST_DURATION=300s # Test duration
+export PRODUCER_COUNT=10 # Number of producer instances
+export CONSUMER_COUNT=5 # Number of consumer instances
+export MESSAGE_RATE=1000 # Messages/second per producer
+export MESSAGE_SIZE=1024 # Message size in bytes
+export TOPIC_COUNT=5 # Number of topics to create
+export PARTITIONS_PER_TOPIC=3 # Partitions per topic
+
+make test
+```
+
+## Configuration
+
+### Main Configuration File
+Edit `config/loadtest.yaml` to customize:
+
+- **Kafka Settings**: Bootstrap servers, security, timeouts
+- **Producer Config**: Batching, compression, acknowledgments
+- **Consumer Config**: Group settings, fetch parameters
+- **Message Settings**: Size, format (JSON/Avro/Binary)
+- **Schema Registry**: Avro/Protobuf schema validation
+- **Metrics**: Prometheus collection intervals
+- **Test Scenarios**: Predefined load patterns
+
+### Example Custom Configuration
+```yaml
+test_mode: "comprehensive"
+duration: "600s" # 10 minutes
+
+producers:
+ count: 15
+ message_rate: 2000
+ message_size: 2048
+ compression_type: "snappy"
+ acks: "all"
+
+consumers:
+ count: 8
+ group_prefix: "high-load-group"
+ max_poll_records: 1000
+
+topics:
+ count: 10
+ partitions: 6
+ replication_factor: 1
+```
+
+## Test Scenarios
+
+### 1. Producer Performance Test
+```bash
+make producer-test TEST_DURATION=10m PRODUCER_COUNT=20 MESSAGE_RATE=3000
+```
+Tests maximum message production throughput.
+
+### 2. Consumer Performance Test
+```bash
+# First produce messages
+make producer-test TEST_DURATION=5m
+
+# Then test consumption
+make consumer-test TEST_DURATION=10m CONSUMER_COUNT=15
+```
+
+### 3. Schema Registry Integration
+```bash
+# Enable schemas in config/loadtest.yaml
+schemas:
+ enabled: true
+
+make test
+```
+Tests Avro message serialization through Schema Registry.
+
+### 4. High Availability Test
+```bash
+# Test with container restarts during load
+make test TEST_DURATION=20m &
+sleep 300
+docker restart kafka-gateway
+```
+
+## Monitoring & Metrics
+
+### Real-Time Dashboards
+When monitoring is enabled:
+- **Prometheus**: http://localhost:9090
+- **Grafana**: http://localhost:3000 (admin/admin)
+
+### Key Metrics Tracked
+- **Throughput**: Messages/second, MB/second
+- **Latency**: End-to-end message latency percentiles
+- **Errors**: Producer/consumer error rates
+- **Consumer Lag**: Per-partition lag monitoring
+- **Resource Usage**: CPU, memory, disk I/O
+
+### Grafana Dashboards
+- **Kafka Load Test**: Comprehensive test metrics
+- **SeaweedFS Cluster**: Storage system health
+- **Custom Dashboards**: Extensible monitoring
+
+## Advanced Features
+
+### Schema Registry Testing
+```bash
+# Test Avro message serialization
+export KAFKA_VALUE_TYPE=avro
+make test
+```
+
+The load test includes:
+- Schema registration
+- Avro message encoding/decoding
+- Schema evolution testing
+- Compatibility validation
+
+### Multi-Client Testing
+The test supports both Sarama and Confluent clients:
+```go
+// Configure in producer/consumer code
+useConfluent := true // Switch client implementation
+```
+
+### Consumer Group Rebalancing
+- Automatic consumer group management
+- Partition rebalancing simulation
+- Consumer failure recovery testing
+
+### Chaos Testing
+```yaml
+chaos:
+ enabled: true
+ producer_failure_rate: 0.01
+ consumer_failure_rate: 0.01
+ network_partition_probability: 0.001
+```
+
+## Troubleshooting
+
+### Common Issues
+
+#### Services Not Starting
+```bash
+# Check service health
+make health-check
+
+# View detailed logs
+make logs
+
+# Debug mode
+make debug
+```
+
+#### Low Throughput
+- Increase `MESSAGE_RATE` and `PRODUCER_COUNT`
+- Adjust `batch_size` and `linger_ms` in config
+- Check consumer `max_poll_records` setting
+
+#### High Latency
+- Reduce `linger_ms` for lower latency
+- Adjust `acks` setting (0, 1, or "all")
+- Monitor consumer lag
+
+#### Memory Issues
+```bash
+# Reduce concurrent clients
+make test PRODUCER_COUNT=5 CONSUMER_COUNT=3
+
+# Adjust message size
+make test MESSAGE_SIZE=512
+```
+
+### Debug Commands
+```bash
+# Execute shell in containers
+make exec-master
+make exec-filer
+make exec-gateway
+
+# Attach to load test
+make attach-loadtest
+
+# View real-time stats
+curl http://localhost:8080/stats
+```
+
+## Development
+
+### Building from Source
+```bash
+# Set up development environment
+make dev-env
+
+# Build load test binary
+make build
+
+# Run tests locally (requires Go 1.21+)
+cd cmd/loadtest && go run main.go -config ../../config/loadtest.yaml
+```
+
+### Extending the Tests
+1. **Add new message formats** in `internal/producer/`
+2. **Add custom metrics** in `internal/metrics/`
+3. **Create new test scenarios** in `config/loadtest.yaml`
+4. **Add monitoring panels** in `monitoring/grafana/dashboards/`
+
+### Contributing
+1. Fork the repository
+2. Create a feature branch
+3. Add tests for new functionality
+4. Ensure all tests pass: `make test`
+5. Submit a pull request
+
+## Performance Benchmarks
+
+### Expected Performance (on typical hardware)
+
+| Scenario | Producers | Consumers | Rate (msg/s) | Latency (p95) |
+|----------|-----------|-----------|--------------|---------------|
+| Quick | 2 | 2 | 200 | <10ms |
+| Standard | 5 | 3 | 2,500 | <20ms |
+| Stress | 20 | 10 | 40,000 | <50ms |
+| Endurance| 10 | 5 | 10,000 | <30ms |
+
+*Results vary based on hardware, network, and SeaweedFS configuration*
+
+### Tuning for Maximum Performance
+```yaml
+producers:
+ batch_size: 1000
+ linger_ms: 10
+ compression_type: "lz4"
+ acks: "1" # Balance between speed and durability
+
+consumers:
+ max_poll_records: 5000
+ fetch_min_bytes: 1048576 # 1MB
+ fetch_max_wait_ms: 100
+```
+
+## Comparison with Existing Tests
+
+| Feature | SMQ Tests | **Kafka Client Load Test** |
+|---------|-----------|----------------------------|
+| Protocol | SMQ (SeaweedFS native) | **Kafka (industry standard)** |
+| Clients | SMQ clients | **Real Kafka clients (Sarama, Confluent)** |
+| Schema Registry | ❌ | **✅ Full Avro/Protobuf support** |
+| Consumer Groups | Basic | **✅ Full Kafka consumer group features** |
+| Monitoring | Basic | **✅ Prometheus + Grafana dashboards** |
+| Test Scenarios | Limited | **✅ Multiple predefined scenarios** |
+| Real-world | Synthetic | **✅ Production-like workloads** |
+
+This load test provides comprehensive validation of the SeaweedFS Kafka Gateway using real-world Kafka clients and protocols.
+
+---
+
+## Quick Reference
+
+```bash
+# Essential Commands
+make help # Show all available commands
+make test # Run default comprehensive test
+make quick-test # 1-minute smoke test
+make stress-test # High-load stress test
+make test-with-monitoring # Include Grafana dashboards
+make clean # Clean up all resources
+
+# Monitoring
+make monitor # Start Prometheus + Grafana
+# → http://localhost:9090 (Prometheus)
+# → http://localhost:3000 (Grafana, admin/admin)
+
+# Advanced
+make benchmark # Run full benchmark suite
+make health-check # Validate service health
+make validate-setup # Check configuration
+```
diff --git a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
new file mode 100644
index 000000000..2f435e600
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
@@ -0,0 +1,465 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/consumer"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/producer"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema"
+)
+
+var (
+ configFile = flag.String("config", "/config/loadtest.yaml", "Path to configuration file")
+ testMode = flag.String("mode", "", "Test mode override (producer|consumer|comprehensive)")
+ duration = flag.Duration("duration", 0, "Test duration override")
+ help = flag.Bool("help", false, "Show help")
+)
+
+func main() {
+ flag.Parse()
+
+ if *help {
+ printHelp()
+ return
+ }
+
+ // Load configuration
+ cfg, err := config.Load(*configFile)
+ if err != nil {
+ log.Fatalf("Failed to load configuration: %v", err)
+ }
+
+ // Override configuration with environment variables and flags
+ cfg.ApplyOverrides(*testMode, *duration)
+
+ // Initialize metrics
+ metricsCollector := metrics.NewCollector()
+
+ // Start metrics HTTP server
+ go func() {
+ http.Handle("/metrics", promhttp.Handler())
+ http.HandleFunc("/health", healthCheck)
+ http.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) {
+ metricsCollector.WriteStats(w)
+ })
+
+ log.Printf("Starting metrics server on :8080")
+ if err := http.ListenAndServe(":8080", nil); err != nil {
+ log.Printf("Metrics server error: %v", err)
+ }
+ }()
+
+ // Set up signal handling
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ sigCh := make(chan os.Signal, 1)
+ signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
+
+ log.Printf("Starting Kafka Client Load Test")
+ log.Printf("Mode: %s, Duration: %v", cfg.TestMode, cfg.Duration)
+ log.Printf("Kafka Brokers: %v", cfg.Kafka.BootstrapServers)
+ log.Printf("Schema Registry: %s", cfg.SchemaRegistry.URL)
+ log.Printf("Schemas Enabled: %v", cfg.Schemas.Enabled)
+
+ // Register schemas if enabled
+ if cfg.Schemas.Enabled {
+ log.Printf("Registering schemas with Schema Registry...")
+ if err := registerSchemas(cfg); err != nil {
+ log.Fatalf("Failed to register schemas: %v", err)
+ }
+ log.Printf("Schemas registered successfully")
+ }
+
+ var wg sync.WaitGroup
+
+ // Start test based on mode
+ var testErr error
+ switch cfg.TestMode {
+ case "producer":
+ testErr = runProducerTest(ctx, cfg, metricsCollector, &wg)
+ case "consumer":
+ testErr = runConsumerTest(ctx, cfg, metricsCollector, &wg)
+ case "comprehensive":
+ testErr = runComprehensiveTest(ctx, cancel, cfg, metricsCollector, &wg)
+ default:
+ log.Fatalf("Unknown test mode: %s", cfg.TestMode)
+ }
+
+ // If test returned an error (e.g., circuit breaker), exit
+ if testErr != nil {
+ log.Printf("Test failed with error: %v", testErr)
+ cancel() // Cancel context to stop any remaining goroutines
+ return
+ }
+
+ // Wait for completion or signal
+ done := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-sigCh:
+ log.Printf("Received shutdown signal, stopping tests...")
+ cancel()
+
+ // Wait for graceful shutdown with timeout
+ shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer shutdownCancel()
+
+ select {
+ case <-done:
+ log.Printf("All tests completed gracefully")
+ case <-shutdownCtx.Done():
+ log.Printf("Shutdown timeout, forcing exit")
+ }
+ case <-done:
+ log.Printf("All tests completed")
+ }
+
+ // Print final statistics
+ log.Printf("Final Test Statistics:")
+ metricsCollector.PrintSummary()
+}
+
+func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error {
+ log.Printf("Starting producer-only test with %d producers", cfg.Producers.Count)
+
+ errChan := make(chan error, cfg.Producers.Count)
+
+ for i := 0; i < cfg.Producers.Count; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+
+ prod, err := producer.New(cfg, collector, id)
+ if err != nil {
+ log.Printf("Failed to create producer %d: %v", id, err)
+ errChan <- err
+ return
+ }
+ defer prod.Close()
+
+ if err := prod.Run(ctx); err != nil {
+ log.Printf("Producer %d failed: %v", id, err)
+ errChan <- err
+ return
+ }
+ }(i)
+ }
+
+ // Wait for any producer error
+ select {
+ case err := <-errChan:
+ log.Printf("Producer test failed: %v", err)
+ return err
+ default:
+ return nil
+ }
+}
+
+func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error {
+ log.Printf("Starting consumer-only test with %d consumers", cfg.Consumers.Count)
+
+ errChan := make(chan error, cfg.Consumers.Count)
+
+ for i := 0; i < cfg.Consumers.Count; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+
+ cons, err := consumer.New(cfg, collector, id)
+ if err != nil {
+ log.Printf("Failed to create consumer %d: %v", id, err)
+ errChan <- err
+ return
+ }
+ defer cons.Close()
+
+ cons.Run(ctx)
+ }(i)
+ }
+
+ // Consumers don't typically return errors in the same way, so just return nil
+ return nil
+}
+
+func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error {
+ log.Printf("Starting comprehensive test with %d producers and %d consumers",
+ cfg.Producers.Count, cfg.Consumers.Count)
+
+ errChan := make(chan error, cfg.Producers.Count)
+
+ // Create separate contexts for producers and consumers
+ producerCtx, producerCancel := context.WithCancel(ctx)
+ consumerCtx, consumerCancel := context.WithCancel(ctx)
+
+ // Start producers
+ for i := 0; i < cfg.Producers.Count; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+
+ prod, err := producer.New(cfg, collector, id)
+ if err != nil {
+ log.Printf("Failed to create producer %d: %v", id, err)
+ errChan <- err
+ return
+ }
+ defer prod.Close()
+
+ if err := prod.Run(producerCtx); err != nil {
+ log.Printf("Producer %d failed: %v", id, err)
+ errChan <- err
+ return
+ }
+ }(i)
+ }
+
+ // Wait briefly for producers to start producing messages
+ // Reduced from 5s to 2s to minimize message backlog
+ time.Sleep(2 * time.Second)
+
+ // Start consumers
+ for i := 0; i < cfg.Consumers.Count; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+
+ cons, err := consumer.New(cfg, collector, id)
+ if err != nil {
+ log.Printf("Failed to create consumer %d: %v", id, err)
+ return
+ }
+ defer cons.Close()
+
+ cons.Run(consumerCtx)
+ }(i)
+ }
+
+ // Check for producer errors
+ select {
+ case err := <-errChan:
+ log.Printf("Comprehensive test failed due to producer error: %v", err)
+ producerCancel()
+ consumerCancel()
+ return err
+ default:
+ // No immediate error, continue
+ }
+
+ // If duration is set, stop producers first, then allow consumers extra time to drain
+ if cfg.Duration > 0 {
+ go func() {
+ timer := time.NewTimer(cfg.Duration)
+ defer timer.Stop()
+
+ select {
+ case <-timer.C:
+ log.Printf("Test duration (%v) reached, stopping producers", cfg.Duration)
+ producerCancel()
+
+ // Allow consumers extra time to drain remaining messages
+ // Calculate drain time based on test duration (minimum 60s, up to test duration)
+ drainTime := 60 * time.Second
+ if cfg.Duration > drainTime {
+ drainTime = cfg.Duration // Match test duration for longer tests
+ }
+ log.Printf("Allowing %v for consumers to drain remaining messages...", drainTime)
+ time.Sleep(drainTime)
+
+ log.Printf("Stopping consumers after drain period")
+ consumerCancel()
+ cancel()
+ case <-ctx.Done():
+ // Context already cancelled
+ producerCancel()
+ consumerCancel()
+ }
+ }()
+ } else {
+ // No duration set, wait for cancellation and ensure cleanup
+ go func() {
+ <-ctx.Done()
+ producerCancel()
+ consumerCancel()
+ }()
+ }
+
+ return nil
+}
+
+func healthCheck(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprint(w, "OK")
+}
+
+func printHelp() {
+ fmt.Printf(`Kafka Client Load Test for SeaweedFS
+
+Usage: %s [options]
+
+Options:
+ -config string
+ Path to configuration file (default "/config/loadtest.yaml")
+ -mode string
+ Test mode override (producer|consumer|comprehensive)
+ -duration duration
+ Test duration override
+ -help
+ Show this help message
+
+Environment Variables:
+ KAFKA_BOOTSTRAP_SERVERS Comma-separated list of Kafka brokers
+ SCHEMA_REGISTRY_URL URL of the Schema Registry
+ TEST_DURATION Test duration (e.g., "5m", "300s")
+ TEST_MODE Test mode (producer|consumer|comprehensive)
+ PRODUCER_COUNT Number of producer instances
+ CONSUMER_COUNT Number of consumer instances
+ MESSAGE_RATE Messages per second per producer
+ MESSAGE_SIZE Message size in bytes
+ TOPIC_COUNT Number of topics to create
+ PARTITIONS_PER_TOPIC Number of partitions per topic
+ VALUE_TYPE Message value type (json/avro/binary)
+
+Test Modes:
+ producer - Run only producers (generate load)
+ consumer - Run only consumers (consume existing messages)
+ comprehensive - Run both producers and consumers simultaneously
+
+Example:
+ %s -config ./config/loadtest.yaml -mode comprehensive -duration 10m
+
+`, os.Args[0], os.Args[0])
+}
+
+// registerSchemas registers schemas with Schema Registry for all topics
+func registerSchemas(cfg *config.Config) error {
+ // Wait for Schema Registry to be ready
+ if err := waitForSchemaRegistry(cfg.SchemaRegistry.URL); err != nil {
+ return fmt.Errorf("schema registry not ready: %w", err)
+ }
+
+ // Register schemas for each topic with different formats for variety
+ topics := cfg.GetTopicNames()
+
+ // Determine schema formats - use different formats for different topics
+ // This provides comprehensive testing of all schema format variations
+ for i, topic := range topics {
+ var schemaFormat string
+
+ // Distribute topics across three schema formats for comprehensive testing
+ // Format 0: AVRO (default, most common)
+ // Format 1: JSON (modern, human-readable)
+ // Format 2: PROTOBUF (efficient binary format)
+ switch i % 3 {
+ case 0:
+ schemaFormat = "AVRO"
+ case 1:
+ schemaFormat = "JSON"
+ case 2:
+ schemaFormat = "PROTOBUF"
+ }
+
+ // Allow override from config if specified
+ if cfg.Producers.SchemaFormat != "" {
+ schemaFormat = cfg.Producers.SchemaFormat
+ }
+
+ if err := registerTopicSchema(cfg.SchemaRegistry.URL, topic, schemaFormat); err != nil {
+ return fmt.Errorf("failed to register schema for topic %s (format: %s): %w", topic, schemaFormat, err)
+ }
+ log.Printf("Schema registered for topic %s with format: %s", topic, schemaFormat)
+ }
+
+ return nil
+}
+
+// waitForSchemaRegistry waits for Schema Registry to be ready
+func waitForSchemaRegistry(url string) error {
+ maxRetries := 30
+ for i := 0; i < maxRetries; i++ {
+ resp, err := http.Get(url + "/subjects")
+ if err == nil && resp.StatusCode == 200 {
+ resp.Body.Close()
+ return nil
+ }
+ if resp != nil {
+ resp.Body.Close()
+ }
+ time.Sleep(2 * time.Second)
+ }
+ return fmt.Errorf("schema registry not ready after %d retries", maxRetries)
+}
+
+// registerTopicSchema registers a schema for a specific topic
+func registerTopicSchema(registryURL, topicName, schemaFormat string) error {
+ // Determine schema format, default to AVRO
+ if schemaFormat == "" {
+ schemaFormat = "AVRO"
+ }
+
+ var schemaStr string
+ var schemaType string
+
+ switch strings.ToUpper(schemaFormat) {
+ case "AVRO":
+ schemaStr = schema.GetAvroSchema()
+ schemaType = "AVRO"
+ case "JSON", "JSON_SCHEMA":
+ schemaStr = schema.GetJSONSchema()
+ schemaType = "JSON"
+ case "PROTOBUF":
+ schemaStr = schema.GetProtobufSchema()
+ schemaType = "PROTOBUF"
+ default:
+ return fmt.Errorf("unsupported schema format: %s", schemaFormat)
+ }
+
+ schemaReq := map[string]interface{}{
+ "schema": schemaStr,
+ "schemaType": schemaType,
+ }
+
+ jsonData, err := json.Marshal(schemaReq)
+ if err != nil {
+ return err
+ }
+
+ // Register schema for topic value
+ subject := topicName + "-value"
+ url := fmt.Sprintf("%s/subjects/%s/versions", registryURL, subject)
+
+ client := &http.Client{Timeout: 10 * time.Second}
+ resp, err := client.Post(url, "application/vnd.schemaregistry.v1+json", bytes.NewBuffer(jsonData))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("schema registration failed: status=%d, body=%s", resp.StatusCode, string(body))
+ }
+
+ log.Printf("Schema registered for topic %s (format: %s)", topicName, schemaType)
+ return nil
+}
diff --git a/test/kafka/kafka-client-loadtest/config/loadtest.yaml b/test/kafka/kafka-client-loadtest/config/loadtest.yaml
new file mode 100644
index 000000000..6a453aab9
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/config/loadtest.yaml
@@ -0,0 +1,169 @@
+# Kafka Client Load Test Configuration
+
+# Test execution settings
+test_mode: "comprehensive" # producer, consumer, comprehensive
+duration: "60s" # Test duration (0 = run indefinitely) - producers will stop at this time, consumers get +120s to drain
+
+# Kafka cluster configuration
+kafka:
+ bootstrap_servers:
+ - "kafka-gateway:9093"
+ # Security settings (if needed)
+ security_protocol: "PLAINTEXT" # PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
+ sasl_mechanism: "" # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
+ sasl_username: ""
+ sasl_password: ""
+
+# Schema Registry configuration
+schema_registry:
+ url: "http://schema-registry:8081"
+ auth:
+ username: ""
+ password: ""
+
+# Producer configuration
+producers:
+ count: 10 # Number of producer instances
+ message_rate: 1000 # Messages per second per producer
+ message_size: 1024 # Message size in bytes
+ batch_size: 100 # Batch size for batching
+ linger_ms: 5 # Time to wait for batching
+ compression_type: "snappy" # none, gzip, snappy, lz4, zstd
+ acks: "all" # 0, 1, all
+ retries: 3
+ retry_backoff_ms: 100
+ request_timeout_ms: 30000
+ delivery_timeout_ms: 120000
+
+ # Message generation settings
+ key_distribution: "random" # random, sequential, uuid
+ value_type: "avro" # json, avro, protobuf, binary
+ schema_format: "" # AVRO, JSON, PROTOBUF - schema registry format (when schemas enabled)
+ # Leave empty to auto-distribute formats across topics for testing:
+ # topic-0: AVRO, topic-1: JSON, topic-2: PROTOBUF, topic-3: AVRO, topic-4: JSON
+ # Set to specific format (e.g. "AVRO") to use same format for all topics
+ include_timestamp: true
+ include_headers: true
+
+# Consumer configuration
+consumers:
+ count: 5 # Number of consumer instances
+ group_prefix: "loadtest-group" # Consumer group prefix
+ auto_offset_reset: "earliest" # earliest, latest
+ enable_auto_commit: true
+ auto_commit_interval_ms: 1000
+ session_timeout_ms: 30000
+ heartbeat_interval_ms: 3000
+ max_poll_records: 500
+ max_poll_interval_ms: 300000
+ fetch_min_bytes: 1
+ fetch_max_bytes: 52428800 # 50MB
+ fetch_max_wait_ms: 100 # 100ms - very fast polling for concurrent fetches and quick drain
+
+# Topic configuration
+topics:
+ count: 5 # Number of topics to create/use
+ prefix: "loadtest-topic" # Topic name prefix
+ partitions: 4 # Partitions per topic (default: 4)
+ replication_factor: 1 # Replication factor
+ cleanup_policy: "delete" # delete, compact
+ retention_ms: 604800000 # 7 days
+ segment_ms: 86400000 # 1 day
+
+# Schema configuration (for Avro/Protobuf tests)
+schemas:
+ enabled: true
+ registry_timeout_ms: 10000
+
+ # Test schemas
+ user_event:
+ type: "avro"
+ schema: |
+ {
+ "type": "record",
+ "name": "UserEvent",
+ "namespace": "com.seaweedfs.test",
+ "fields": [
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }
+
+ transaction:
+ type: "avro"
+ schema: |
+ {
+ "type": "record",
+ "name": "Transaction",
+ "namespace": "com.seaweedfs.test",
+ "fields": [
+ {"name": "transaction_id", "type": "string"},
+ {"name": "amount", "type": "double"},
+ {"name": "currency", "type": "string"},
+ {"name": "merchant_id", "type": "string"},
+ {"name": "timestamp", "type": "long"}
+ ]
+ }
+
+# Metrics and monitoring
+metrics:
+ enabled: true
+ collection_interval: "10s"
+ prometheus_port: 8080
+
+ # What to measure
+ track_latency: true
+ track_throughput: true
+ track_errors: true
+ track_consumer_lag: true
+
+ # Latency percentiles to track
+ latency_percentiles: [50, 90, 95, 99, 99.9]
+
+# Load test scenarios
+scenarios:
+ # Steady state load test
+ steady_load:
+ producer_rate: 1000 # messages/sec per producer
+ ramp_up_time: "30s"
+ steady_duration: "240s"
+ ramp_down_time: "30s"
+
+ # Burst load test
+ burst_load:
+ base_rate: 500
+ burst_rate: 5000
+ burst_duration: "10s"
+ burst_interval: "60s"
+
+ # Gradual ramp test
+ ramp_test:
+ start_rate: 100
+ end_rate: 2000
+ ramp_duration: "300s"
+ step_duration: "30s"
+
+# Error injection (for resilience testing)
+chaos:
+ enabled: false
+ producer_failure_rate: 0.01 # 1% of producers fail randomly
+ consumer_failure_rate: 0.01 # 1% of consumers fail randomly
+ network_partition_probability: 0.001 # Network issues
+ broker_restart_interval: "0s" # Restart brokers periodically (0s = disabled)
+
+# Output and reporting
+output:
+ results_dir: "/test-results"
+ export_prometheus: true
+ export_csv: true
+ export_json: true
+ real_time_stats: true
+ stats_interval: "30s"
+
+# Logging
+logging:
+ level: "info" # debug, info, warn, error
+ format: "text" # text, json
+ enable_kafka_logs: false # Enable Kafka client debug logs \ No newline at end of file
diff --git a/test/kafka/kafka-client-loadtest/docker-compose-kafka-compare.yml b/test/kafka/kafka-client-loadtest/docker-compose-kafka-compare.yml
new file mode 100644
index 000000000..e3184941b
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/docker-compose-kafka-compare.yml
@@ -0,0 +1,46 @@
+version: '3.8'
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.5.0
+ hostname: zookeeper
+ container_name: compare-zookeeper
+ ports:
+ - "2181:2181"
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ kafka:
+ image: confluentinc/cp-kafka:7.5.0
+ hostname: kafka
+ container_name: compare-kafka
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+ KAFKA_LOG_RETENTION_HOURS: 1
+ KAFKA_LOG_SEGMENT_BYTES: 1073741824
+
+ schema-registry:
+ image: confluentinc/cp-schema-registry:7.5.0
+ hostname: schema-registry
+ container_name: compare-schema-registry
+ depends_on:
+ - kafka
+ ports:
+ - "8082:8081"
+ environment:
+ SCHEMA_REGISTRY_HOST_NAME: schema-registry
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
+ SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
+
diff --git a/test/kafka/kafka-client-loadtest/docker-compose.yml b/test/kafka/kafka-client-loadtest/docker-compose.yml
new file mode 100644
index 000000000..54b49ecd2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/docker-compose.yml
@@ -0,0 +1,316 @@
+# SeaweedFS Kafka Client Load Test
+# Tests the full stack: Kafka Clients -> SeaweedFS Kafka Gateway -> SeaweedFS MQ Broker -> Storage
+
+x-seaweedfs-build: &seaweedfs-build
+ build:
+ context: .
+ dockerfile: Dockerfile.seaweedfs
+ args:
+ TARGETARCH: ${GOARCH:-arm64}
+ CACHE_BUST: ${CACHE_BUST:-latest}
+ image: kafka-client-loadtest-seaweedfs
+
+services:
+ # Schema Registry (for Avro/Protobuf support)
+ # Using host networking to connect to localhost:9093 (where our gateway advertises)
+ # WORKAROUND: Schema Registry hangs on empty _schemas topic during bootstrap
+ # Pre-create the topic first to avoid "wait to catch up" hang
+ schema-registry-init:
+ image: confluentinc/cp-kafka:8.0.0
+ container_name: loadtest-schema-registry-init
+ networks:
+ - kafka-loadtest-net
+ depends_on:
+ kafka-gateway:
+ condition: service_healthy
+ command: >
+ bash -c "
+ echo 'Creating _schemas topic...';
+ kafka-topics --create --topic _schemas --partitions 1 --replication-factor 1 --bootstrap-server kafka-gateway:9093 --if-not-exists || exit 0;
+ echo '_schemas topic created successfully';
+ "
+
+ schema-registry:
+ image: confluentinc/cp-schema-registry:8.0.0
+ container_name: loadtest-schema-registry
+ restart: on-failure:3
+ ports:
+ - "8081:8081"
+ environment:
+ SCHEMA_REGISTRY_HOST_NAME: schema-registry
+ SCHEMA_REGISTRY_HOST_PORT: 8081
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-gateway:9093'
+ SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
+ SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+ SCHEMA_REGISTRY_DEBUG: "true"
+ SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "full"
+ SCHEMA_REGISTRY_LEADER_ELIGIBILITY: "true"
+ SCHEMA_REGISTRY_MODE: "READWRITE"
+ SCHEMA_REGISTRY_GROUP_ID: "schema-registry"
+ SCHEMA_REGISTRY_KAFKASTORE_GROUP_ID: "schema-registry"
+ SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "PLAINTEXT"
+ SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: "1"
+ SCHEMA_REGISTRY_KAFKASTORE_INIT_TIMEOUT: "120000"
+ SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT: "60000"
+ SCHEMA_REGISTRY_REQUEST_TIMEOUT_MS: "60000"
+ SCHEMA_REGISTRY_RETRY_BACKOFF_MS: "1000"
+ # Force IPv4 to work around Java IPv6 issues
+ # Enable verbose logging and set reasonable memory limits
+ KAFKA_OPTS: "-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Addresses=true -Xmx512M -Xms256M"
+ KAFKA_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/kafka/log4j.properties"
+ SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: "INFO"
+ SCHEMA_REGISTRY_KAFKASTORE_WRITE_TIMEOUT_MS: "60000"
+ SCHEMA_REGISTRY_KAFKASTORE_INIT_RETRY_BACKOFF_MS: "5000"
+ SCHEMA_REGISTRY_KAFKASTORE_CONSUMER_AUTO_OFFSET_RESET: "earliest"
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"]
+ interval: 15s
+ timeout: 10s
+ retries: 10
+ start_period: 30s
+ depends_on:
+ schema-registry-init:
+ condition: service_completed_successfully
+ kafka-gateway:
+ condition: service_healthy
+ networks:
+ - kafka-loadtest-net
+
+ # SeaweedFS Master (coordinator)
+ seaweedfs-master:
+ <<: *seaweedfs-build
+ container_name: loadtest-seaweedfs-master
+ ports:
+ - "9333:9333"
+ - "19333:19333"
+ command:
+ - master
+ - -ip=seaweedfs-master
+ - -port=9333
+ - -port.grpc=19333
+ - -volumeSizeLimitMB=48
+ - -defaultReplication=000
+ - -garbageThreshold=0.3
+ volumes:
+ - ./data/seaweedfs-master:/data
+ healthcheck:
+ test: ["CMD-SHELL", "wget --quiet --tries=1 --spider http://seaweedfs-master:9333/cluster/status || exit 1"]
+ interval: 10s
+ timeout: 5s
+ retries: 10
+ start_period: 20s
+ networks:
+ - kafka-loadtest-net
+
+ # SeaweedFS Volume Server (storage)
+ seaweedfs-volume:
+ <<: *seaweedfs-build
+ container_name: loadtest-seaweedfs-volume
+ ports:
+ - "8080:8080"
+ - "18080:18080"
+ command:
+ - volume
+ - -mserver=seaweedfs-master:9333
+ - -ip=seaweedfs-volume
+ - -port=8080
+ - -port.grpc=18080
+ - -publicUrl=seaweedfs-volume:8080
+ - -preStopSeconds=1
+ - -compactionMBps=50
+ - -max=0
+ - -dir=/data
+ depends_on:
+ seaweedfs-master:
+ condition: service_healthy
+ volumes:
+ - ./data/seaweedfs-volume:/data
+ healthcheck:
+ test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-volume:8080/status"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ start_period: 15s
+ networks:
+ - kafka-loadtest-net
+
+ # SeaweedFS Filer (metadata)
+ seaweedfs-filer:
+ <<: *seaweedfs-build
+ container_name: loadtest-seaweedfs-filer
+ ports:
+ - "8888:8888"
+ - "18888:18888"
+ - "18889:18889"
+ command:
+ - filer
+ - -master=seaweedfs-master:9333
+ - -ip=seaweedfs-filer
+ - -port=8888
+ - -port.grpc=18888
+ - -metricsPort=18889
+ - -defaultReplicaPlacement=000
+ depends_on:
+ seaweedfs-master:
+ condition: service_healthy
+ seaweedfs-volume:
+ condition: service_healthy
+ volumes:
+ - ./data/seaweedfs-filer:/data
+ healthcheck:
+ test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://seaweedfs-filer:8888/"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ start_period: 15s
+ networks:
+ - kafka-loadtest-net
+
+ # SeaweedFS MQ Broker (message handling)
+ seaweedfs-mq-broker:
+ <<: *seaweedfs-build
+ container_name: loadtest-seaweedfs-mq-broker
+ ports:
+ - "17777:17777"
+ - "18777:18777" # pprof profiling port
+ command:
+ - mq.broker
+ - -master=seaweedfs-master:9333
+ - -ip=seaweedfs-mq-broker
+ - -port=17777
+ - -logFlushInterval=0
+ - -port.pprof=18777
+ depends_on:
+ seaweedfs-filer:
+ condition: service_healthy
+ volumes:
+ - ./data/seaweedfs-mq:/data
+ healthcheck:
+ test: ["CMD", "nc", "-z", "localhost", "17777"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ start_period: 20s
+ networks:
+ - kafka-loadtest-net
+
+ # SeaweedFS Kafka Gateway (Kafka protocol compatibility)
+ kafka-gateway:
+ <<: *seaweedfs-build
+ container_name: loadtest-kafka-gateway
+ ports:
+ - "9093:9093"
+ - "10093:10093" # pprof profiling port
+ command:
+ - mq.kafka.gateway
+ - -master=seaweedfs-master:9333
+ - -ip=kafka-gateway
+ - -ip.bind=0.0.0.0
+ - -port=9093
+ - -default-partitions=4
+ - -schema-registry-url=http://schema-registry:8081
+ - -port.pprof=10093
+ depends_on:
+ seaweedfs-filer:
+ condition: service_healthy
+ seaweedfs-mq-broker:
+ condition: service_healthy
+ environment:
+ - SEAWEEDFS_MASTERS=seaweedfs-master:9333
+ # - KAFKA_DEBUG=1 # Enable debug logging for Schema Registry troubleshooting
+ - KAFKA_ADVERTISED_HOST=kafka-gateway
+ volumes:
+ - ./data/kafka-gateway:/data
+ healthcheck:
+ test: ["CMD", "nc", "-z", "localhost", "9093"]
+ interval: 10s
+ timeout: 5s
+ retries: 10
+ start_period: 45s # Increased to account for 10s startup delay + filer discovery
+ networks:
+ - kafka-loadtest-net
+
+ # Kafka Client Load Test Runner
+ kafka-client-loadtest:
+ build:
+ context: ../../..
+ dockerfile: test/kafka/kafka-client-loadtest/Dockerfile.loadtest
+ container_name: kafka-client-loadtest-runner
+ depends_on:
+ kafka-gateway:
+ condition: service_healthy
+ # schema-registry:
+ # condition: service_healthy
+ environment:
+ - KAFKA_BOOTSTRAP_SERVERS=kafka-gateway:9093
+ - SCHEMA_REGISTRY_URL=http://schema-registry:8081
+ - TEST_DURATION=${TEST_DURATION:-300s}
+ - PRODUCER_COUNT=${PRODUCER_COUNT:-10}
+ - CONSUMER_COUNT=${CONSUMER_COUNT:-5}
+ - MESSAGE_RATE=${MESSAGE_RATE:-1000}
+ - MESSAGE_SIZE=${MESSAGE_SIZE:-1024}
+ - TOPIC_COUNT=${TOPIC_COUNT:-5}
+ - PARTITIONS_PER_TOPIC=${PARTITIONS_PER_TOPIC:-3}
+ - TEST_MODE=${TEST_MODE:-comprehensive}
+ - SCHEMAS_ENABLED=true
+ - VALUE_TYPE=${VALUE_TYPE:-avro}
+ profiles:
+ - loadtest
+ volumes:
+ - ./test-results:/test-results
+ networks:
+ - kafka-loadtest-net
+
+ # Monitoring and Metrics
+ prometheus:
+ image: prom/prometheus:latest
+ container_name: loadtest-prometheus
+ ports:
+ - "9090:9090"
+ volumes:
+ - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
+ - prometheus-data:/prometheus
+ networks:
+ - kafka-loadtest-net
+ profiles:
+ - monitoring
+
+ grafana:
+ image: grafana/grafana:latest
+ container_name: loadtest-grafana
+ ports:
+ - "3000:3000"
+ environment:
+ - GF_SECURITY_ADMIN_PASSWORD=admin
+ volumes:
+ - ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards
+ - ./monitoring/grafana/provisioning:/etc/grafana/provisioning
+ - grafana-data:/var/lib/grafana
+ networks:
+ - kafka-loadtest-net
+ profiles:
+ - monitoring
+
+ # Schema Registry Debug Runner
+ schema-registry-debug:
+ build:
+ context: debug-client
+ dockerfile: Dockerfile
+ container_name: schema-registry-debug-runner
+ depends_on:
+ kafka-gateway:
+ condition: service_healthy
+ networks:
+ - kafka-loadtest-net
+ profiles:
+ - debug
+
+volumes:
+ prometheus-data:
+ grafana-data:
+
+networks:
+ kafka-loadtest-net:
+ driver: bridge
+ name: kafka-client-loadtest
+
diff --git a/test/kafka/kafka-client-loadtest/go.mod b/test/kafka/kafka-client-loadtest/go.mod
new file mode 100644
index 000000000..6ebbfc396
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/go.mod
@@ -0,0 +1,41 @@
+module github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest
+
+go 1.24.0
+
+toolchain go1.24.7
+
+require (
+ github.com/IBM/sarama v1.46.1
+ github.com/linkedin/goavro/v2 v2.14.0
+ github.com/prometheus/client_golang v1.23.2
+ gopkg.in/yaml.v3 v3.0.1
+)
+
+require (
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/eapache/go-resiliency v1.7.0 // indirect
+ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
+ github.com/eapache/queue v1.1.0 // indirect
+ github.com/golang/snappy v1.0.0 // indirect
+ github.com/hashicorp/go-uuid v1.0.3 // indirect
+ github.com/jcmturner/aescts/v2 v2.0.0 // indirect
+ github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
+ github.com/jcmturner/gofork v1.7.6 // indirect
+ github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
+ github.com/jcmturner/rpc/v2 v2.0.3 // indirect
+ github.com/klauspost/compress v1.18.0 // indirect
+ github.com/kr/text v0.2.0 // indirect
+ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+ github.com/pierrec/lz4/v4 v4.1.22 // indirect
+ github.com/prometheus/client_model v0.6.2 // indirect
+ github.com/prometheus/common v0.66.1 // indirect
+ github.com/prometheus/procfs v0.16.1 // indirect
+ github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
+ go.yaml.in/yaml/v2 v2.4.2 // indirect
+ golang.org/x/crypto v0.42.0 // indirect
+ golang.org/x/net v0.44.0 // indirect
+ golang.org/x/sys v0.36.0 // indirect
+ google.golang.org/protobuf v1.36.8 // indirect
+)
diff --git a/test/kafka/kafka-client-loadtest/go.sum b/test/kafka/kafka-client-loadtest/go.sum
new file mode 100644
index 000000000..d1869c0fc
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/go.sum
@@ -0,0 +1,129 @@
+github.com/IBM/sarama v1.46.1 h1:AlDkvyQm4LKktoQZxv0sbTfH3xukeH7r/UFBbUmFV9M=
+github.com/IBM/sarama v1.46.1/go.mod h1:ipyOREIx+o9rMSrrPGLZHGuT0mzecNzKd19Quq+Q8AA=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
+github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
+github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
+github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
+github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
+github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
+github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
+github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
+github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
+github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
+github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
+github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
+github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
+github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
+github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
+github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
+github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
+github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
+github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
+github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
+github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
+github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
+github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
+github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
+github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
+github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
+github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
+github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
+github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
+github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
+github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
+github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
+github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
+github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
+github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
+go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
+golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
+golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
+golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
+golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
+golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
+google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/kafka/kafka-client-loadtest/internal/config/config.go b/test/kafka/kafka-client-loadtest/internal/config/config.go
new file mode 100644
index 000000000..dd9f6d6b2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/config/config.go
@@ -0,0 +1,361 @@
+package config
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+
+ "gopkg.in/yaml.v3"
+)
+
+// Config represents the complete load test configuration
+type Config struct {
+ TestMode string `yaml:"test_mode"`
+ Duration time.Duration `yaml:"duration"`
+
+ Kafka KafkaConfig `yaml:"kafka"`
+ SchemaRegistry SchemaRegistryConfig `yaml:"schema_registry"`
+ Producers ProducersConfig `yaml:"producers"`
+ Consumers ConsumersConfig `yaml:"consumers"`
+ Topics TopicsConfig `yaml:"topics"`
+ Schemas SchemasConfig `yaml:"schemas"`
+ Metrics MetricsConfig `yaml:"metrics"`
+ Scenarios ScenariosConfig `yaml:"scenarios"`
+ Chaos ChaosConfig `yaml:"chaos"`
+ Output OutputConfig `yaml:"output"`
+ Logging LoggingConfig `yaml:"logging"`
+}
+
+type KafkaConfig struct {
+ BootstrapServers []string `yaml:"bootstrap_servers"`
+ SecurityProtocol string `yaml:"security_protocol"`
+ SASLMechanism string `yaml:"sasl_mechanism"`
+ SASLUsername string `yaml:"sasl_username"`
+ SASLPassword string `yaml:"sasl_password"`
+}
+
+type SchemaRegistryConfig struct {
+ URL string `yaml:"url"`
+ Auth struct {
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ } `yaml:"auth"`
+}
+
+type ProducersConfig struct {
+ Count int `yaml:"count"`
+ MessageRate int `yaml:"message_rate"`
+ MessageSize int `yaml:"message_size"`
+ BatchSize int `yaml:"batch_size"`
+ LingerMs int `yaml:"linger_ms"`
+ CompressionType string `yaml:"compression_type"`
+ Acks string `yaml:"acks"`
+ Retries int `yaml:"retries"`
+ RetryBackoffMs int `yaml:"retry_backoff_ms"`
+ RequestTimeoutMs int `yaml:"request_timeout_ms"`
+ DeliveryTimeoutMs int `yaml:"delivery_timeout_ms"`
+ KeyDistribution string `yaml:"key_distribution"`
+ ValueType string `yaml:"value_type"` // json, avro, protobuf, binary
+ SchemaFormat string `yaml:"schema_format"` // AVRO, JSON, PROTOBUF (schema registry format)
+ IncludeTimestamp bool `yaml:"include_timestamp"`
+ IncludeHeaders bool `yaml:"include_headers"`
+}
+
+type ConsumersConfig struct {
+ Count int `yaml:"count"`
+ GroupPrefix string `yaml:"group_prefix"`
+ AutoOffsetReset string `yaml:"auto_offset_reset"`
+ EnableAutoCommit bool `yaml:"enable_auto_commit"`
+ AutoCommitIntervalMs int `yaml:"auto_commit_interval_ms"`
+ SessionTimeoutMs int `yaml:"session_timeout_ms"`
+ HeartbeatIntervalMs int `yaml:"heartbeat_interval_ms"`
+ MaxPollRecords int `yaml:"max_poll_records"`
+ MaxPollIntervalMs int `yaml:"max_poll_interval_ms"`
+ FetchMinBytes int `yaml:"fetch_min_bytes"`
+ FetchMaxBytes int `yaml:"fetch_max_bytes"`
+ FetchMaxWaitMs int `yaml:"fetch_max_wait_ms"`
+}
+
+type TopicsConfig struct {
+ Count int `yaml:"count"`
+ Prefix string `yaml:"prefix"`
+ Partitions int `yaml:"partitions"`
+ ReplicationFactor int `yaml:"replication_factor"`
+ CleanupPolicy string `yaml:"cleanup_policy"`
+ RetentionMs int64 `yaml:"retention_ms"`
+ SegmentMs int64 `yaml:"segment_ms"`
+}
+
+type SchemaConfig struct {
+ Type string `yaml:"type"`
+ Schema string `yaml:"schema"`
+}
+
+type SchemasConfig struct {
+ Enabled bool `yaml:"enabled"`
+ RegistryTimeoutMs int `yaml:"registry_timeout_ms"`
+ UserEvent SchemaConfig `yaml:"user_event"`
+ Transaction SchemaConfig `yaml:"transaction"`
+}
+
+type MetricsConfig struct {
+ Enabled bool `yaml:"enabled"`
+ CollectionInterval time.Duration `yaml:"collection_interval"`
+ PrometheusPort int `yaml:"prometheus_port"`
+ TrackLatency bool `yaml:"track_latency"`
+ TrackThroughput bool `yaml:"track_throughput"`
+ TrackErrors bool `yaml:"track_errors"`
+ TrackConsumerLag bool `yaml:"track_consumer_lag"`
+ LatencyPercentiles []float64 `yaml:"latency_percentiles"`
+}
+
+type ScenarioConfig struct {
+ ProducerRate int `yaml:"producer_rate"`
+ RampUpTime time.Duration `yaml:"ramp_up_time"`
+ SteadyDuration time.Duration `yaml:"steady_duration"`
+ RampDownTime time.Duration `yaml:"ramp_down_time"`
+ BaseRate int `yaml:"base_rate"`
+ BurstRate int `yaml:"burst_rate"`
+ BurstDuration time.Duration `yaml:"burst_duration"`
+ BurstInterval time.Duration `yaml:"burst_interval"`
+ StartRate int `yaml:"start_rate"`
+ EndRate int `yaml:"end_rate"`
+ RampDuration time.Duration `yaml:"ramp_duration"`
+ StepDuration time.Duration `yaml:"step_duration"`
+}
+
+type ScenariosConfig struct {
+ SteadyLoad ScenarioConfig `yaml:"steady_load"`
+ BurstLoad ScenarioConfig `yaml:"burst_load"`
+ RampTest ScenarioConfig `yaml:"ramp_test"`
+}
+
+type ChaosConfig struct {
+ Enabled bool `yaml:"enabled"`
+ ProducerFailureRate float64 `yaml:"producer_failure_rate"`
+ ConsumerFailureRate float64 `yaml:"consumer_failure_rate"`
+ NetworkPartitionProbability float64 `yaml:"network_partition_probability"`
+ BrokerRestartInterval time.Duration `yaml:"broker_restart_interval"`
+}
+
+type OutputConfig struct {
+ ResultsDir string `yaml:"results_dir"`
+ ExportPrometheus bool `yaml:"export_prometheus"`
+ ExportCSV bool `yaml:"export_csv"`
+ ExportJSON bool `yaml:"export_json"`
+ RealTimeStats bool `yaml:"real_time_stats"`
+ StatsInterval time.Duration `yaml:"stats_interval"`
+}
+
+type LoggingConfig struct {
+ Level string `yaml:"level"`
+ Format string `yaml:"format"`
+ EnableKafkaLogs bool `yaml:"enable_kafka_logs"`
+}
+
+// Load reads and parses the configuration file
+func Load(configFile string) (*Config, error) {
+ data, err := os.ReadFile(configFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read config file %s: %w", configFile, err)
+ }
+
+ var cfg Config
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ return nil, fmt.Errorf("failed to parse config file %s: %w", configFile, err)
+ }
+
+ // Apply default values
+ cfg.setDefaults()
+
+ // Apply environment variable overrides
+ cfg.applyEnvOverrides()
+
+ return &cfg, nil
+}
+
+// ApplyOverrides applies command-line flag overrides
+func (c *Config) ApplyOverrides(testMode string, duration time.Duration) {
+ if testMode != "" {
+ c.TestMode = testMode
+ }
+ if duration > 0 {
+ c.Duration = duration
+ }
+}
+
+// setDefaults sets default values for optional fields
+func (c *Config) setDefaults() {
+ if c.TestMode == "" {
+ c.TestMode = "comprehensive"
+ }
+
+ if len(c.Kafka.BootstrapServers) == 0 {
+ c.Kafka.BootstrapServers = []string{"kafka-gateway:9093"}
+ }
+
+ if c.SchemaRegistry.URL == "" {
+ c.SchemaRegistry.URL = "http://schema-registry:8081"
+ }
+
+ // Schema support is always enabled since Kafka Gateway now enforces schema-first behavior
+ c.Schemas.Enabled = true
+
+ if c.Producers.Count == 0 {
+ c.Producers.Count = 10
+ }
+
+ if c.Consumers.Count == 0 {
+ c.Consumers.Count = 5
+ }
+
+ if c.Topics.Count == 0 {
+ c.Topics.Count = 5
+ }
+
+ if c.Topics.Prefix == "" {
+ c.Topics.Prefix = "loadtest-topic"
+ }
+
+ if c.Topics.Partitions == 0 {
+ c.Topics.Partitions = 4 // Default to 4 partitions
+ }
+
+ if c.Topics.ReplicationFactor == 0 {
+ c.Topics.ReplicationFactor = 1 // Default to 1 replica
+ }
+
+ if c.Consumers.GroupPrefix == "" {
+ c.Consumers.GroupPrefix = "loadtest-group"
+ }
+
+ if c.Output.ResultsDir == "" {
+ c.Output.ResultsDir = "/test-results"
+ }
+
+ if c.Metrics.CollectionInterval == 0 {
+ c.Metrics.CollectionInterval = 10 * time.Second
+ }
+
+ if c.Output.StatsInterval == 0 {
+ c.Output.StatsInterval = 30 * time.Second
+ }
+}
+
+// applyEnvOverrides applies environment variable overrides
+func (c *Config) applyEnvOverrides() {
+ if servers := os.Getenv("KAFKA_BOOTSTRAP_SERVERS"); servers != "" {
+ c.Kafka.BootstrapServers = strings.Split(servers, ",")
+ }
+
+ if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
+ c.SchemaRegistry.URL = url
+ }
+
+ if mode := os.Getenv("TEST_MODE"); mode != "" {
+ c.TestMode = mode
+ }
+
+ if duration := os.Getenv("TEST_DURATION"); duration != "" {
+ if d, err := time.ParseDuration(duration); err == nil {
+ c.Duration = d
+ }
+ }
+
+ if count := os.Getenv("PRODUCER_COUNT"); count != "" {
+ if i, err := strconv.Atoi(count); err == nil {
+ c.Producers.Count = i
+ }
+ }
+
+ if count := os.Getenv("CONSUMER_COUNT"); count != "" {
+ if i, err := strconv.Atoi(count); err == nil {
+ c.Consumers.Count = i
+ }
+ }
+
+ if rate := os.Getenv("MESSAGE_RATE"); rate != "" {
+ if i, err := strconv.Atoi(rate); err == nil {
+ c.Producers.MessageRate = i
+ }
+ }
+
+ if size := os.Getenv("MESSAGE_SIZE"); size != "" {
+ if i, err := strconv.Atoi(size); err == nil {
+ c.Producers.MessageSize = i
+ }
+ }
+
+ if count := os.Getenv("TOPIC_COUNT"); count != "" {
+ if i, err := strconv.Atoi(count); err == nil {
+ c.Topics.Count = i
+ }
+ }
+
+ if partitions := os.Getenv("PARTITIONS_PER_TOPIC"); partitions != "" {
+ if i, err := strconv.Atoi(partitions); err == nil {
+ c.Topics.Partitions = i
+ }
+ }
+
+ if valueType := os.Getenv("VALUE_TYPE"); valueType != "" {
+ c.Producers.ValueType = valueType
+ }
+
+ if schemaFormat := os.Getenv("SCHEMA_FORMAT"); schemaFormat != "" {
+ c.Producers.SchemaFormat = schemaFormat
+ }
+
+ if enabled := os.Getenv("SCHEMAS_ENABLED"); enabled != "" {
+ c.Schemas.Enabled = enabled == "true"
+ }
+}
+
+// GetTopicNames returns the list of topic names to use for testing
+func (c *Config) GetTopicNames() []string {
+ topics := make([]string, c.Topics.Count)
+ for i := 0; i < c.Topics.Count; i++ {
+ topics[i] = fmt.Sprintf("%s-%d", c.Topics.Prefix, i)
+ }
+ return topics
+}
+
+// GetConsumerGroupNames returns the list of consumer group names
+func (c *Config) GetConsumerGroupNames() []string {
+ groups := make([]string, c.Consumers.Count)
+ for i := 0; i < c.Consumers.Count; i++ {
+ groups[i] = fmt.Sprintf("%s-%d", c.Consumers.GroupPrefix, i)
+ }
+ return groups
+}
+
+// Validate validates the configuration
+func (c *Config) Validate() error {
+ if c.TestMode != "producer" && c.TestMode != "consumer" && c.TestMode != "comprehensive" {
+ return fmt.Errorf("invalid test mode: %s", c.TestMode)
+ }
+
+ if len(c.Kafka.BootstrapServers) == 0 {
+ return fmt.Errorf("kafka bootstrap servers not specified")
+ }
+
+ if c.Producers.Count <= 0 && (c.TestMode == "producer" || c.TestMode == "comprehensive") {
+ return fmt.Errorf("producer count must be greater than 0 for producer or comprehensive tests")
+ }
+
+ if c.Consumers.Count <= 0 && (c.TestMode == "consumer" || c.TestMode == "comprehensive") {
+ return fmt.Errorf("consumer count must be greater than 0 for consumer or comprehensive tests")
+ }
+
+ if c.Topics.Count <= 0 {
+ return fmt.Errorf("topic count must be greater than 0")
+ }
+
+ if c.Topics.Partitions <= 0 {
+ return fmt.Errorf("partitions per topic must be greater than 0")
+ }
+
+ return nil
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
new file mode 100644
index 000000000..e1c4caa41
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
@@ -0,0 +1,626 @@
+package consumer
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/linkedin/goavro/v2"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
+ pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
+ "google.golang.org/protobuf/proto"
+)
+
+// Consumer represents a Kafka consumer for load testing
+type Consumer struct {
+ id int
+ config *config.Config
+ metricsCollector *metrics.Collector
+ saramaConsumer sarama.ConsumerGroup
+ useConfluent bool // Always false, Sarama only
+ topics []string
+ consumerGroup string
+ avroCodec *goavro.Codec
+
+ // Schema format tracking per topic
+ schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, PROTOBUF)
+
+ // Processing tracking
+ messagesProcessed int64
+ lastOffset map[string]map[int32]int64
+ offsetMutex sync.RWMutex
+}
+
+// New creates a new consumer instance
+func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) {
+ consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id)
+
+ c := &Consumer{
+ id: id,
+ config: cfg,
+ metricsCollector: collector,
+ topics: cfg.GetTopicNames(),
+ consumerGroup: consumerGroup,
+ useConfluent: false, // Use Sarama by default
+ lastOffset: make(map[string]map[int32]int64),
+ schemaFormats: make(map[string]string),
+ }
+
+ // Initialize schema formats for each topic (must match producer logic)
+ // This mirrors the format distribution in cmd/loadtest/main.go registerSchemas()
+ for i, topic := range c.topics {
+ var schemaFormat string
+ if cfg.Producers.SchemaFormat != "" {
+ // Use explicit config if provided
+ schemaFormat = cfg.Producers.SchemaFormat
+ } else {
+ // Distribute across formats (same as producer)
+ switch i % 3 {
+ case 0:
+ schemaFormat = "AVRO"
+ case 1:
+ schemaFormat = "JSON"
+ case 2:
+ schemaFormat = "PROTOBUF"
+ }
+ }
+ c.schemaFormats[topic] = schemaFormat
+ log.Printf("Consumer %d: Topic %s will use schema format: %s", id, topic, schemaFormat)
+ }
+
+ // Initialize consumer based on configuration
+ if c.useConfluent {
+ if err := c.initConfluentConsumer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Confluent consumer: %w", err)
+ }
+ } else {
+ if err := c.initSaramaConsumer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Sarama consumer: %w", err)
+ }
+ }
+
+ // Initialize Avro codec if schemas are enabled
+ if cfg.Schemas.Enabled {
+ if err := c.initAvroCodec(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Avro codec: %w", err)
+ }
+ }
+
+ log.Printf("Consumer %d initialized for group %s", id, consumerGroup)
+ return c, nil
+}
+
+// initSaramaConsumer initializes the Sarama consumer group
+func (c *Consumer) initSaramaConsumer() error {
+ config := sarama.NewConfig()
+
+ // Consumer configuration
+ config.Consumer.Return.Errors = true
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ if c.config.Consumers.AutoOffsetReset == "latest" {
+ config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ }
+
+ // Auto commit configuration
+ config.Consumer.Offsets.AutoCommit.Enable = c.config.Consumers.EnableAutoCommit
+ config.Consumer.Offsets.AutoCommit.Interval = time.Duration(c.config.Consumers.AutoCommitIntervalMs) * time.Millisecond
+
+ // Session and heartbeat configuration
+ config.Consumer.Group.Session.Timeout = time.Duration(c.config.Consumers.SessionTimeoutMs) * time.Millisecond
+ config.Consumer.Group.Heartbeat.Interval = time.Duration(c.config.Consumers.HeartbeatIntervalMs) * time.Millisecond
+
+ // Fetch configuration
+ config.Consumer.Fetch.Min = int32(c.config.Consumers.FetchMinBytes)
+ config.Consumer.Fetch.Default = 10 * 1024 * 1024 // 10MB per partition (increased from 1MB default)
+ config.Consumer.Fetch.Max = int32(c.config.Consumers.FetchMaxBytes)
+ config.Consumer.MaxWaitTime = time.Duration(c.config.Consumers.FetchMaxWaitMs) * time.Millisecond
+ config.Consumer.MaxProcessingTime = time.Duration(c.config.Consumers.MaxPollIntervalMs) * time.Millisecond
+
+ // Channel buffer sizes for concurrent partition consumption
+ config.ChannelBufferSize = 256 // Increase from default 256 to allow more buffering
+
+ // Enable concurrent partition fetching by increasing the number of broker connections
+ // This allows Sarama to fetch from multiple partitions in parallel
+ config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests
+
+ // Version
+ config.Version = sarama.V2_8_0_0
+
+ // Create consumer group
+ consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config)
+ if err != nil {
+ return fmt.Errorf("failed to create Sarama consumer group: %w", err)
+ }
+
+ c.saramaConsumer = consumerGroup
+ return nil
+}
+
+// initConfluentConsumer initializes the Confluent Kafka Go consumer
+func (c *Consumer) initConfluentConsumer() error {
+ // Confluent consumer disabled, using Sarama only
+ return fmt.Errorf("confluent consumer not enabled")
+}
+
+// initAvroCodec initializes the Avro codec for schema-based messages
+func (c *Consumer) initAvroCodec() error {
+ // Use the LoadTestMessage schema (matches what producer uses)
+ loadTestSchema := `{
+ "type": "record",
+ "name": "LoadTestMessage",
+ "namespace": "com.seaweedfs.loadtest",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "producer_id", "type": "int"},
+ {"name": "counter", "type": "long"},
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }`
+
+ codec, err := goavro.NewCodec(loadTestSchema)
+ if err != nil {
+ return fmt.Errorf("failed to create Avro codec: %w", err)
+ }
+
+ c.avroCodec = codec
+ return nil
+}
+
+// Run starts the consumer and consumes messages until the context is cancelled
+func (c *Consumer) Run(ctx context.Context) {
+ log.Printf("Consumer %d starting for group %s", c.id, c.consumerGroup)
+ defer log.Printf("Consumer %d stopped", c.id)
+
+ if c.useConfluent {
+ c.runConfluentConsumer(ctx)
+ } else {
+ c.runSaramaConsumer(ctx)
+ }
+}
+
+// runSaramaConsumer runs the Sarama consumer group
+func (c *Consumer) runSaramaConsumer(ctx context.Context) {
+ handler := &ConsumerGroupHandler{
+ consumer: c,
+ }
+
+ var wg sync.WaitGroup
+
+ // Start error handler
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case err, ok := <-c.saramaConsumer.Errors():
+ if !ok {
+ return
+ }
+ log.Printf("Consumer %d error: %v", c.id, err)
+ c.metricsCollector.RecordConsumerError()
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ // Start consumer group session
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err := c.saramaConsumer.Consume(ctx, c.topics, handler); err != nil {
+ log.Printf("Consumer %d: Error consuming: %v", c.id, err)
+ c.metricsCollector.RecordConsumerError()
+
+ // Wait before retrying
+ select {
+ case <-time.After(5 * time.Second):
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+ }()
+
+ // Start lag monitoring
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ c.monitorConsumerLag(ctx)
+ }()
+
+ // Wait for completion
+ <-ctx.Done()
+ log.Printf("Consumer %d: Context cancelled, shutting down", c.id)
+ wg.Wait()
+}
+
+// runConfluentConsumer runs the Confluent consumer
+func (c *Consumer) runConfluentConsumer(ctx context.Context) {
+ // Confluent consumer disabled, using Sarama only
+ log.Printf("Consumer %d: Confluent consumer not enabled", c.id)
+}
+
+// processMessage processes a consumed message
+func (c *Consumer) processMessage(topicPtr *string, partition int32, offset int64, key, value []byte) error {
+ topic := ""
+ if topicPtr != nil {
+ topic = *topicPtr
+ }
+
+ // Update offset tracking
+ c.updateOffset(topic, partition, offset)
+
+ // Decode message based on topic-specific schema format
+ var decodedMessage interface{}
+ var err error
+
+ // Determine schema format for this topic (if schemas are enabled)
+ var schemaFormat string
+ if c.config.Schemas.Enabled {
+ schemaFormat = c.schemaFormats[topic]
+ if schemaFormat == "" {
+ // Fallback to config if topic not in map
+ schemaFormat = c.config.Producers.ValueType
+ }
+ } else {
+ // No schemas, use global value type
+ schemaFormat = c.config.Producers.ValueType
+ }
+
+ // Decode message based on format
+ switch schemaFormat {
+ case "avro", "AVRO":
+ decodedMessage, err = c.decodeAvroMessage(value)
+ case "json", "JSON", "JSON_SCHEMA":
+ decodedMessage, err = c.decodeJSONSchemaMessage(value)
+ case "protobuf", "PROTOBUF":
+ decodedMessage, err = c.decodeProtobufMessage(value)
+ case "binary":
+ decodedMessage, err = c.decodeBinaryMessage(value)
+ default:
+ // Fallback to plain JSON
+ decodedMessage, err = c.decodeJSONMessage(value)
+ }
+
+ if err != nil {
+ return fmt.Errorf("failed to decode message: %w", err)
+ }
+
+ // Note: Removed artificial delay to allow maximum throughput
+ // If you need to simulate processing time, add a configurable delay setting
+ // time.Sleep(time.Millisecond) // Minimal processing delay
+
+ // Record metrics
+ c.metricsCollector.RecordConsumedMessage(len(value))
+ c.messagesProcessed++
+
+ // Log progress
+ if c.id == 0 && c.messagesProcessed%1000 == 0 {
+ log.Printf("Consumer %d: Processed %d messages (latest: %s[%d]@%d)",
+ c.id, c.messagesProcessed, topic, partition, offset)
+ }
+
+ // Optional: Validate message content (for testing purposes)
+ if c.config.Chaos.Enabled {
+ if err := c.validateMessage(decodedMessage); err != nil {
+ log.Printf("Consumer %d: Message validation failed: %v", c.id, err)
+ }
+ }
+
+ return nil
+}
+
+// decodeJSONMessage decodes a JSON message
+func (c *Consumer) decodeJSONMessage(value []byte) (interface{}, error) {
+ var message map[string]interface{}
+ if err := json.Unmarshal(value, &message); err != nil {
+ // DEBUG: Log the raw bytes when JSON parsing fails
+ log.Printf("Consumer %d: JSON decode failed. Length: %d, Raw bytes (hex): %x, Raw string: %q, Error: %v",
+ c.id, len(value), value, string(value), err)
+ return nil, err
+ }
+ return message, nil
+}
+
+// decodeAvroMessage decodes an Avro message (handles Confluent Wire Format)
+func (c *Consumer) decodeAvroMessage(value []byte) (interface{}, error) {
+ if c.avroCodec == nil {
+ return nil, fmt.Errorf("Avro codec not initialized")
+ }
+
+ // Handle Confluent Wire Format when schemas are enabled
+ var avroData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract Avro data (bytes 5+)
+ avroData = value[5:]
+ } else {
+ // No wire format, use raw data
+ avroData = value
+ }
+
+ native, _, err := c.avroCodec.NativeFromBinary(avroData)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode Avro data: %w", err)
+ }
+
+ return native, nil
+}
+
+// decodeJSONSchemaMessage decodes a JSON Schema message (handles Confluent Wire Format)
+func (c *Consumer) decodeJSONSchemaMessage(value []byte) (interface{}, error) {
+ // Handle Confluent Wire Format when schemas are enabled
+ var jsonData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract JSON data (bytes 5+)
+ jsonData = value[5:]
+ } else {
+ // No wire format, use raw data
+ jsonData = value
+ }
+
+ // Decode JSON
+ var message map[string]interface{}
+ if err := json.Unmarshal(jsonData, &message); err != nil {
+ return nil, fmt.Errorf("failed to decode JSON data: %w", err)
+ }
+
+ return message, nil
+}
+
+// decodeProtobufMessage decodes a Protobuf message (handles Confluent Wire Format)
+func (c *Consumer) decodeProtobufMessage(value []byte) (interface{}, error) {
+ // Handle Confluent Wire Format when schemas are enabled
+ var protoData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract Protobuf data (bytes 5+)
+ protoData = value[5:]
+ } else {
+ // No wire format, use raw data
+ protoData = value
+ }
+
+ // Unmarshal protobuf message
+ var protoMsg pb.LoadTestMessage
+ if err := proto.Unmarshal(protoData, &protoMsg); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err)
+ }
+
+ // Convert to map for consistency with other decoders
+ return map[string]interface{}{
+ "id": protoMsg.Id,
+ "timestamp": protoMsg.Timestamp,
+ "producer_id": protoMsg.ProducerId,
+ "counter": protoMsg.Counter,
+ "user_id": protoMsg.UserId,
+ "event_type": protoMsg.EventType,
+ "properties": protoMsg.Properties,
+ }, nil
+}
+
+// decodeBinaryMessage decodes a binary message
+func (c *Consumer) decodeBinaryMessage(value []byte) (interface{}, error) {
+ if len(value) < 20 {
+ return nil, fmt.Errorf("binary message too short")
+ }
+
+ // Extract fields from the binary format:
+ // [producer_id:4][counter:8][timestamp:8][random_data:...]
+
+ producerID := int(value[0])<<24 | int(value[1])<<16 | int(value[2])<<8 | int(value[3])
+
+ var counter int64
+ for i := 0; i < 8; i++ {
+ counter |= int64(value[4+i]) << (56 - i*8)
+ }
+
+ var timestamp int64
+ for i := 0; i < 8; i++ {
+ timestamp |= int64(value[12+i]) << (56 - i*8)
+ }
+
+ return map[string]interface{}{
+ "producer_id": producerID,
+ "counter": counter,
+ "timestamp": timestamp,
+ "data_size": len(value),
+ }, nil
+}
+
+// validateMessage performs basic message validation
+func (c *Consumer) validateMessage(message interface{}) error {
+ // This is a placeholder for message validation logic
+ // In a real load test, you might validate:
+ // - Message structure
+ // - Required fields
+ // - Data consistency
+ // - Schema compliance
+
+ if message == nil {
+ return fmt.Errorf("message is nil")
+ }
+
+ return nil
+}
+
+// updateOffset updates the last seen offset for lag calculation
+func (c *Consumer) updateOffset(topic string, partition int32, offset int64) {
+ c.offsetMutex.Lock()
+ defer c.offsetMutex.Unlock()
+
+ if c.lastOffset[topic] == nil {
+ c.lastOffset[topic] = make(map[int32]int64)
+ }
+ c.lastOffset[topic][partition] = offset
+}
+
+// monitorConsumerLag monitors and reports consumer lag
+func (c *Consumer) monitorConsumerLag(ctx context.Context) {
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ c.reportConsumerLag()
+ }
+ }
+}
+
+// reportConsumerLag calculates and reports consumer lag
+func (c *Consumer) reportConsumerLag() {
+ // This is a simplified lag calculation
+ // In a real implementation, you would query the broker for high water marks
+
+ c.offsetMutex.RLock()
+ defer c.offsetMutex.RUnlock()
+
+ for topic, partitions := range c.lastOffset {
+ for partition, _ := range partitions {
+ // For simplicity, assume lag is always 0 when we're consuming actively
+ // In a real test, you would compare against the high water mark
+ lag := int64(0)
+
+ c.metricsCollector.UpdateConsumerLag(c.consumerGroup, topic, partition, lag)
+ }
+ }
+}
+
+// Close closes the consumer and cleans up resources
+func (c *Consumer) Close() error {
+ log.Printf("Consumer %d: Closing", c.id)
+
+ if c.saramaConsumer != nil {
+ return c.saramaConsumer.Close()
+ }
+
+ return nil
+}
+
+// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
+type ConsumerGroupHandler struct {
+ consumer *Consumer
+}
+
+// Setup is run at the beginning of a new session, before ConsumeClaim
+func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
+ log.Printf("Consumer %d: Consumer group session setup", h.consumer.id)
+ return nil
+}
+
+// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
+func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id)
+ return nil
+}
+
+// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages()
+func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ msgCount := 0
+ for {
+ select {
+ case message, ok := <-claim.Messages():
+ if !ok {
+ return nil
+ }
+ msgCount++
+
+ // Process the message
+ var key []byte
+ if message.Key != nil {
+ key = message.Key
+ }
+
+ if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil {
+ log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err)
+ h.consumer.metricsCollector.RecordConsumerError()
+
+ // Add a small delay for schema validation or other processing errors to avoid overloading
+ // select {
+ // case <-time.After(100 * time.Millisecond):
+ // // Continue after brief delay
+ // case <-session.Context().Done():
+ // return nil
+ // }
+ } else {
+ // Mark message as processed
+ session.MarkMessage(message, "")
+ }
+
+ case <-session.Context().Done():
+ log.Printf("Consumer %d: Session context cancelled for %s[%d]",
+ h.consumer.id, claim.Topic(), claim.Partition())
+ return nil
+ }
+ }
+}
+
+// Helper functions
+
+func joinStrings(strs []string, sep string) string {
+ if len(strs) == 0 {
+ return ""
+ }
+
+ result := strs[0]
+ for i := 1; i < len(strs); i++ {
+ result += sep + strs[i]
+ }
+ return result
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/metrics/collector.go b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go
new file mode 100644
index 000000000..d6a1edb8e
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go
@@ -0,0 +1,353 @@
+package metrics
+
+import (
+ "fmt"
+ "io"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+// Collector handles metrics collection for the load test
+type Collector struct {
+ // Atomic counters for thread-safe operations
+ messagesProduced int64
+ messagesConsumed int64
+ bytesProduced int64
+ bytesConsumed int64
+ producerErrors int64
+ consumerErrors int64
+
+ // Latency tracking
+ latencies []time.Duration
+ latencyMutex sync.RWMutex
+
+ // Consumer lag tracking
+ consumerLag map[string]int64
+ consumerLagMutex sync.RWMutex
+
+ // Test timing
+ startTime time.Time
+
+ // Prometheus metrics
+ prometheusMetrics *PrometheusMetrics
+}
+
+// PrometheusMetrics holds all Prometheus metric definitions
+type PrometheusMetrics struct {
+ MessagesProducedTotal prometheus.Counter
+ MessagesConsumedTotal prometheus.Counter
+ BytesProducedTotal prometheus.Counter
+ BytesConsumedTotal prometheus.Counter
+ ProducerErrorsTotal prometheus.Counter
+ ConsumerErrorsTotal prometheus.Counter
+
+ MessageLatencyHistogram prometheus.Histogram
+ ProducerThroughput prometheus.Gauge
+ ConsumerThroughput prometheus.Gauge
+ ConsumerLagGauge *prometheus.GaugeVec
+
+ ActiveProducers prometheus.Gauge
+ ActiveConsumers prometheus.Gauge
+}
+
+// NewCollector creates a new metrics collector
+func NewCollector() *Collector {
+ return &Collector{
+ startTime: time.Now(),
+ consumerLag: make(map[string]int64),
+ prometheusMetrics: &PrometheusMetrics{
+ MessagesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_messages_produced_total",
+ Help: "Total number of messages produced",
+ }),
+ MessagesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_messages_consumed_total",
+ Help: "Total number of messages consumed",
+ }),
+ BytesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_bytes_produced_total",
+ Help: "Total bytes produced",
+ }),
+ BytesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_bytes_consumed_total",
+ Help: "Total bytes consumed",
+ }),
+ ProducerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_producer_errors_total",
+ Help: "Total number of producer errors",
+ }),
+ ConsumerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_consumer_errors_total",
+ Help: "Total number of consumer errors",
+ }),
+ MessageLatencyHistogram: promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: "kafka_loadtest_message_latency_seconds",
+ Help: "Message end-to-end latency in seconds",
+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1ms to ~32s
+ }),
+ ProducerThroughput: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_producer_throughput_msgs_per_sec",
+ Help: "Current producer throughput in messages per second",
+ }),
+ ConsumerThroughput: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_consumer_throughput_msgs_per_sec",
+ Help: "Current consumer throughput in messages per second",
+ }),
+ ConsumerLagGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_consumer_lag_messages",
+ Help: "Consumer lag in messages",
+ }, []string{"consumer_group", "topic", "partition"}),
+ ActiveProducers: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_active_producers",
+ Help: "Number of active producers",
+ }),
+ ActiveConsumers: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_active_consumers",
+ Help: "Number of active consumers",
+ }),
+ },
+ }
+}
+
+// RecordProducedMessage records a successfully produced message
+func (c *Collector) RecordProducedMessage(size int, latency time.Duration) {
+ atomic.AddInt64(&c.messagesProduced, 1)
+ atomic.AddInt64(&c.bytesProduced, int64(size))
+
+ c.prometheusMetrics.MessagesProducedTotal.Inc()
+ c.prometheusMetrics.BytesProducedTotal.Add(float64(size))
+ c.prometheusMetrics.MessageLatencyHistogram.Observe(latency.Seconds())
+
+ // Store latency for percentile calculations
+ c.latencyMutex.Lock()
+ c.latencies = append(c.latencies, latency)
+ // Keep only recent latencies to avoid memory bloat
+ if len(c.latencies) > 100000 {
+ c.latencies = c.latencies[50000:]
+ }
+ c.latencyMutex.Unlock()
+}
+
+// RecordConsumedMessage records a successfully consumed message
+func (c *Collector) RecordConsumedMessage(size int) {
+ atomic.AddInt64(&c.messagesConsumed, 1)
+ atomic.AddInt64(&c.bytesConsumed, int64(size))
+
+ c.prometheusMetrics.MessagesConsumedTotal.Inc()
+ c.prometheusMetrics.BytesConsumedTotal.Add(float64(size))
+}
+
+// RecordProducerError records a producer error
+func (c *Collector) RecordProducerError() {
+ atomic.AddInt64(&c.producerErrors, 1)
+ c.prometheusMetrics.ProducerErrorsTotal.Inc()
+}
+
+// RecordConsumerError records a consumer error
+func (c *Collector) RecordConsumerError() {
+ atomic.AddInt64(&c.consumerErrors, 1)
+ c.prometheusMetrics.ConsumerErrorsTotal.Inc()
+}
+
+// UpdateConsumerLag updates consumer lag metrics
+func (c *Collector) UpdateConsumerLag(consumerGroup, topic string, partition int32, lag int64) {
+ key := fmt.Sprintf("%s-%s-%d", consumerGroup, topic, partition)
+
+ c.consumerLagMutex.Lock()
+ c.consumerLag[key] = lag
+ c.consumerLagMutex.Unlock()
+
+ c.prometheusMetrics.ConsumerLagGauge.WithLabelValues(
+ consumerGroup, topic, fmt.Sprintf("%d", partition),
+ ).Set(float64(lag))
+}
+
+// UpdateThroughput updates throughput gauges
+func (c *Collector) UpdateThroughput(producerRate, consumerRate float64) {
+ c.prometheusMetrics.ProducerThroughput.Set(producerRate)
+ c.prometheusMetrics.ConsumerThroughput.Set(consumerRate)
+}
+
+// UpdateActiveClients updates active client counts
+func (c *Collector) UpdateActiveClients(producers, consumers int) {
+ c.prometheusMetrics.ActiveProducers.Set(float64(producers))
+ c.prometheusMetrics.ActiveConsumers.Set(float64(consumers))
+}
+
+// GetStats returns current statistics
+func (c *Collector) GetStats() Stats {
+ produced := atomic.LoadInt64(&c.messagesProduced)
+ consumed := atomic.LoadInt64(&c.messagesConsumed)
+ bytesProduced := atomic.LoadInt64(&c.bytesProduced)
+ bytesConsumed := atomic.LoadInt64(&c.bytesConsumed)
+ producerErrors := atomic.LoadInt64(&c.producerErrors)
+ consumerErrors := atomic.LoadInt64(&c.consumerErrors)
+
+ duration := time.Since(c.startTime)
+
+ // Calculate throughput
+ producerThroughput := float64(produced) / duration.Seconds()
+ consumerThroughput := float64(consumed) / duration.Seconds()
+
+ // Calculate latency percentiles
+ var latencyPercentiles map[float64]time.Duration
+ c.latencyMutex.RLock()
+ if len(c.latencies) > 0 {
+ latencyPercentiles = c.calculatePercentiles(c.latencies)
+ }
+ c.latencyMutex.RUnlock()
+
+ // Get consumer lag summary
+ c.consumerLagMutex.RLock()
+ totalLag := int64(0)
+ maxLag := int64(0)
+ for _, lag := range c.consumerLag {
+ totalLag += lag
+ if lag > maxLag {
+ maxLag = lag
+ }
+ }
+ avgLag := float64(0)
+ if len(c.consumerLag) > 0 {
+ avgLag = float64(totalLag) / float64(len(c.consumerLag))
+ }
+ c.consumerLagMutex.RUnlock()
+
+ return Stats{
+ Duration: duration,
+ MessagesProduced: produced,
+ MessagesConsumed: consumed,
+ BytesProduced: bytesProduced,
+ BytesConsumed: bytesConsumed,
+ ProducerErrors: producerErrors,
+ ConsumerErrors: consumerErrors,
+ ProducerThroughput: producerThroughput,
+ ConsumerThroughput: consumerThroughput,
+ LatencyPercentiles: latencyPercentiles,
+ TotalConsumerLag: totalLag,
+ MaxConsumerLag: maxLag,
+ AvgConsumerLag: avgLag,
+ }
+}
+
+// PrintSummary prints a summary of the test statistics
+func (c *Collector) PrintSummary() {
+ stats := c.GetStats()
+
+ fmt.Printf("\n=== Load Test Summary ===\n")
+ fmt.Printf("Test Duration: %v\n", stats.Duration)
+ fmt.Printf("\nMessages:\n")
+ fmt.Printf(" Produced: %d (%.2f MB)\n", stats.MessagesProduced, float64(stats.BytesProduced)/1024/1024)
+ fmt.Printf(" Consumed: %d (%.2f MB)\n", stats.MessagesConsumed, float64(stats.BytesConsumed)/1024/1024)
+ fmt.Printf(" Producer Errors: %d\n", stats.ProducerErrors)
+ fmt.Printf(" Consumer Errors: %d\n", stats.ConsumerErrors)
+
+ fmt.Printf("\nThroughput:\n")
+ fmt.Printf(" Producer: %.2f msgs/sec\n", stats.ProducerThroughput)
+ fmt.Printf(" Consumer: %.2f msgs/sec\n", stats.ConsumerThroughput)
+
+ if stats.LatencyPercentiles != nil {
+ fmt.Printf("\nLatency Percentiles:\n")
+ percentiles := []float64{50, 90, 95, 99, 99.9}
+ for _, p := range percentiles {
+ if latency, exists := stats.LatencyPercentiles[p]; exists {
+ fmt.Printf(" p%.1f: %v\n", p, latency)
+ }
+ }
+ }
+
+ fmt.Printf("\nConsumer Lag:\n")
+ fmt.Printf(" Total: %d messages\n", stats.TotalConsumerLag)
+ fmt.Printf(" Max: %d messages\n", stats.MaxConsumerLag)
+ fmt.Printf(" Average: %.2f messages\n", stats.AvgConsumerLag)
+ fmt.Printf("=========================\n")
+}
+
+// WriteStats writes statistics to a writer (for HTTP endpoint)
+func (c *Collector) WriteStats(w io.Writer) {
+ stats := c.GetStats()
+
+ fmt.Fprintf(w, "# Load Test Statistics\n")
+ fmt.Fprintf(w, "duration_seconds %v\n", stats.Duration.Seconds())
+ fmt.Fprintf(w, "messages_produced %d\n", stats.MessagesProduced)
+ fmt.Fprintf(w, "messages_consumed %d\n", stats.MessagesConsumed)
+ fmt.Fprintf(w, "bytes_produced %d\n", stats.BytesProduced)
+ fmt.Fprintf(w, "bytes_consumed %d\n", stats.BytesConsumed)
+ fmt.Fprintf(w, "producer_errors %d\n", stats.ProducerErrors)
+ fmt.Fprintf(w, "consumer_errors %d\n", stats.ConsumerErrors)
+ fmt.Fprintf(w, "producer_throughput_msgs_per_sec %f\n", stats.ProducerThroughput)
+ fmt.Fprintf(w, "consumer_throughput_msgs_per_sec %f\n", stats.ConsumerThroughput)
+ fmt.Fprintf(w, "total_consumer_lag %d\n", stats.TotalConsumerLag)
+ fmt.Fprintf(w, "max_consumer_lag %d\n", stats.MaxConsumerLag)
+ fmt.Fprintf(w, "avg_consumer_lag %f\n", stats.AvgConsumerLag)
+
+ if stats.LatencyPercentiles != nil {
+ for percentile, latency := range stats.LatencyPercentiles {
+ fmt.Fprintf(w, "latency_p%g_seconds %f\n", percentile, latency.Seconds())
+ }
+ }
+}
+
+// calculatePercentiles calculates latency percentiles
+func (c *Collector) calculatePercentiles(latencies []time.Duration) map[float64]time.Duration {
+ if len(latencies) == 0 {
+ return nil
+ }
+
+ // Make a copy and sort
+ sorted := make([]time.Duration, len(latencies))
+ copy(sorted, latencies)
+ sort.Slice(sorted, func(i, j int) bool {
+ return sorted[i] < sorted[j]
+ })
+
+ percentiles := map[float64]time.Duration{
+ 50: calculatePercentile(sorted, 50),
+ 90: calculatePercentile(sorted, 90),
+ 95: calculatePercentile(sorted, 95),
+ 99: calculatePercentile(sorted, 99),
+ 99.9: calculatePercentile(sorted, 99.9),
+ }
+
+ return percentiles
+}
+
+// calculatePercentile calculates a specific percentile from sorted data
+func calculatePercentile(sorted []time.Duration, percentile float64) time.Duration {
+ if len(sorted) == 0 {
+ return 0
+ }
+
+ index := percentile / 100.0 * float64(len(sorted)-1)
+ if index == float64(int(index)) {
+ return sorted[int(index)]
+ }
+
+ lower := sorted[int(index)]
+ upper := sorted[int(index)+1]
+ weight := index - float64(int(index))
+
+ return time.Duration(float64(lower) + weight*float64(upper-lower))
+}
+
+// Stats represents the current test statistics
+type Stats struct {
+ Duration time.Duration
+ MessagesProduced int64
+ MessagesConsumed int64
+ BytesProduced int64
+ BytesConsumed int64
+ ProducerErrors int64
+ ConsumerErrors int64
+ ProducerThroughput float64
+ ConsumerThroughput float64
+ LatencyPercentiles map[float64]time.Duration
+ TotalConsumerLag int64
+ MaxConsumerLag int64
+ AvgConsumerLag float64
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/producer/producer.go b/test/kafka/kafka-client-loadtest/internal/producer/producer.go
new file mode 100644
index 000000000..167bfeac6
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/producer/producer.go
@@ -0,0 +1,770 @@
+package producer
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "math/rand"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/linkedin/goavro/v2"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema"
+ pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
+ "google.golang.org/protobuf/proto"
+)
+
+// ErrCircuitBreakerOpen indicates that the circuit breaker is open due to consecutive failures
+var ErrCircuitBreakerOpen = errors.New("circuit breaker is open")
+
+// Producer represents a Kafka producer for load testing
+type Producer struct {
+ id int
+ config *config.Config
+ metricsCollector *metrics.Collector
+ saramaProducer sarama.SyncProducer
+ useConfluent bool
+ topics []string
+ avroCodec *goavro.Codec
+ startTime time.Time // Test run start time for generating unique keys
+
+ // Schema management
+ schemaIDs map[string]int // topic -> schema ID mapping
+ schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, etc.)
+
+ // Rate limiting
+ rateLimiter *time.Ticker
+
+ // Message generation
+ messageCounter int64
+ random *rand.Rand
+
+ // Circuit breaker detection
+ consecutiveFailures int
+}
+
+// Message represents a test message
+type Message struct {
+ ID string `json:"id"`
+ Timestamp int64 `json:"timestamp"`
+ ProducerID int `json:"producer_id"`
+ Counter int64 `json:"counter"`
+ UserID string `json:"user_id"`
+ EventType string `json:"event_type"`
+ Properties map[string]interface{} `json:"properties"`
+}
+
+// New creates a new producer instance
+func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, error) {
+ p := &Producer{
+ id: id,
+ config: cfg,
+ metricsCollector: collector,
+ topics: cfg.GetTopicNames(),
+ random: rand.New(rand.NewSource(time.Now().UnixNano() + int64(id))),
+ useConfluent: false, // Use Sarama by default, can be made configurable
+ schemaIDs: make(map[string]int),
+ schemaFormats: make(map[string]string),
+ startTime: time.Now(), // Record test start time for unique key generation
+ }
+
+ // Initialize schema formats for each topic
+ // Distribute across AVRO, JSON, and PROTOBUF formats
+ for i, topic := range p.topics {
+ var schemaFormat string
+ if cfg.Producers.SchemaFormat != "" {
+ // Use explicit config if provided
+ schemaFormat = cfg.Producers.SchemaFormat
+ } else {
+ // Distribute across three formats: AVRO, JSON, PROTOBUF
+ switch i % 3 {
+ case 0:
+ schemaFormat = "AVRO"
+ case 1:
+ schemaFormat = "JSON"
+ case 2:
+ schemaFormat = "PROTOBUF"
+ }
+ }
+ p.schemaFormats[topic] = schemaFormat
+ log.Printf("Producer %d: Topic %s will use schema format: %s", id, topic, schemaFormat)
+ }
+
+ // Set up rate limiter if specified
+ if cfg.Producers.MessageRate > 0 {
+ p.rateLimiter = time.NewTicker(time.Second / time.Duration(cfg.Producers.MessageRate))
+ }
+
+ // Initialize Sarama producer
+ if err := p.initSaramaProducer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Sarama producer: %w", err)
+ }
+
+ // Initialize Avro codec and register/fetch schemas if schemas are enabled
+ if cfg.Schemas.Enabled {
+ if err := p.initAvroCodec(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Avro codec: %w", err)
+ }
+ if err := p.ensureSchemasRegistered(); err != nil {
+ return nil, fmt.Errorf("failed to ensure schemas are registered: %w", err)
+ }
+ if err := p.fetchSchemaIDs(); err != nil {
+ return nil, fmt.Errorf("failed to fetch schema IDs: %w", err)
+ }
+ }
+
+ log.Printf("Producer %d initialized successfully", id)
+ return p, nil
+}
+
+// initSaramaProducer initializes the Sarama producer
+func (p *Producer) initSaramaProducer() error {
+ config := sarama.NewConfig()
+
+ // Producer configuration
+ config.Producer.RequiredAcks = sarama.WaitForAll
+ if p.config.Producers.Acks == "0" {
+ config.Producer.RequiredAcks = sarama.NoResponse
+ } else if p.config.Producers.Acks == "1" {
+ config.Producer.RequiredAcks = sarama.WaitForLocal
+ }
+
+ config.Producer.Retry.Max = p.config.Producers.Retries
+ config.Producer.Retry.Backoff = time.Duration(p.config.Producers.RetryBackoffMs) * time.Millisecond
+ config.Producer.Return.Successes = true
+ config.Producer.Return.Errors = true
+
+ // Compression
+ switch p.config.Producers.CompressionType {
+ case "gzip":
+ config.Producer.Compression = sarama.CompressionGZIP
+ case "snappy":
+ config.Producer.Compression = sarama.CompressionSnappy
+ case "lz4":
+ config.Producer.Compression = sarama.CompressionLZ4
+ case "zstd":
+ config.Producer.Compression = sarama.CompressionZSTD
+ default:
+ config.Producer.Compression = sarama.CompressionNone
+ }
+
+ // Batching
+ config.Producer.Flush.Messages = p.config.Producers.BatchSize
+ config.Producer.Flush.Frequency = time.Duration(p.config.Producers.LingerMs) * time.Millisecond
+
+ // Timeouts
+ config.Net.DialTimeout = 30 * time.Second
+ config.Net.ReadTimeout = 30 * time.Second
+ config.Net.WriteTimeout = 30 * time.Second
+
+ // Version
+ config.Version = sarama.V2_8_0_0
+
+ // Create producer
+ producer, err := sarama.NewSyncProducer(p.config.Kafka.BootstrapServers, config)
+ if err != nil {
+ return fmt.Errorf("failed to create Sarama producer: %w", err)
+ }
+
+ p.saramaProducer = producer
+ return nil
+}
+
+// initAvroCodec initializes the Avro codec for schema-based messages
+func (p *Producer) initAvroCodec() error {
+ // Use the shared LoadTestMessage schema
+ codec, err := goavro.NewCodec(schema.GetAvroSchema())
+ if err != nil {
+ return fmt.Errorf("failed to create Avro codec: %w", err)
+ }
+
+ p.avroCodec = codec
+ return nil
+}
+
+// Run starts the producer and produces messages until the context is cancelled
+func (p *Producer) Run(ctx context.Context) error {
+ log.Printf("Producer %d starting", p.id)
+ defer log.Printf("Producer %d stopped", p.id)
+
+ // Create topics if they don't exist
+ if err := p.createTopics(); err != nil {
+ log.Printf("Producer %d: Failed to create topics: %v", p.id, err)
+ p.metricsCollector.RecordProducerError()
+ return err
+ }
+
+ var wg sync.WaitGroup
+ errChan := make(chan error, 1)
+
+ // Main production loop
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := p.produceMessages(ctx); err != nil {
+ errChan <- err
+ }
+ }()
+
+ // Wait for completion or error
+ select {
+ case <-ctx.Done():
+ log.Printf("Producer %d: Context cancelled, shutting down", p.id)
+ case err := <-errChan:
+ log.Printf("Producer %d: Stopping due to error: %v", p.id, err)
+ return err
+ }
+
+ // Stop rate limiter
+ if p.rateLimiter != nil {
+ p.rateLimiter.Stop()
+ }
+
+ // Wait for goroutines to finish
+ wg.Wait()
+ return nil
+}
+
+// produceMessages is the main message production loop
+func (p *Producer) produceMessages(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ default:
+ // Rate limiting
+ if p.rateLimiter != nil {
+ select {
+ case <-p.rateLimiter.C:
+ // Proceed
+ case <-ctx.Done():
+ return nil
+ }
+ }
+
+ if err := p.produceMessage(); err != nil {
+ log.Printf("Producer %d: Failed to produce message: %v", p.id, err)
+ p.metricsCollector.RecordProducerError()
+
+ // Check for circuit breaker error
+ if p.isCircuitBreakerError(err) {
+ p.consecutiveFailures++
+ log.Printf("Producer %d: Circuit breaker error detected (%d/%d consecutive failures)",
+ p.id, p.consecutiveFailures, 3)
+
+ // Progressive backoff delay to avoid overloading the gateway
+ backoffDelay := time.Duration(p.consecutiveFailures) * 500 * time.Millisecond
+ log.Printf("Producer %d: Backing off for %v to avoid overloading gateway", p.id, backoffDelay)
+
+ select {
+ case <-time.After(backoffDelay):
+ // Continue after delay
+ case <-ctx.Done():
+ return nil
+ }
+
+ // If we've hit 3 consecutive circuit breaker errors, stop the producer
+ if p.consecutiveFailures >= 3 {
+ log.Printf("Producer %d: Circuit breaker is open - stopping producer after %d consecutive failures",
+ p.id, p.consecutiveFailures)
+ return fmt.Errorf("%w: stopping producer after %d consecutive failures", ErrCircuitBreakerOpen, p.consecutiveFailures)
+ }
+ } else {
+ // Reset counter for non-circuit breaker errors
+ p.consecutiveFailures = 0
+ }
+ } else {
+ // Reset counter on successful message
+ p.consecutiveFailures = 0
+ }
+ }
+ }
+}
+
+// produceMessage produces a single message
+func (p *Producer) produceMessage() error {
+ startTime := time.Now()
+
+ // Select random topic
+ topic := p.topics[p.random.Intn(len(p.topics))]
+
+ // Produce message using Sarama (message will be generated based on topic's schema format)
+ return p.produceSaramaMessage(topic, startTime)
+}
+
+// produceSaramaMessage produces a message using Sarama
+// The message is generated internally based on the topic's schema format
+func (p *Producer) produceSaramaMessage(topic string, startTime time.Time) error {
+ // Generate key
+ key := p.generateMessageKey()
+
+ // If schemas are enabled, wrap in Confluent Wire Format based on topic's schema format
+ var messageValue []byte
+ if p.config.Schemas.Enabled {
+ schemaID, exists := p.schemaIDs[topic]
+ if !exists {
+ return fmt.Errorf("schema ID not found for topic %s", topic)
+ }
+
+ // Get the schema format for this topic
+ schemaFormat := p.schemaFormats[topic]
+
+ // CRITICAL FIX: Encode based on schema format, NOT config value_type
+ // The encoding MUST match what the schema registry and gateway expect
+ var encodedMessage []byte
+ var err error
+ switch schemaFormat {
+ case "AVRO":
+ // For Avro schema, encode as Avro binary
+ encodedMessage, err = p.generateAvroMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as Avro for topic %s: %w", topic, err)
+ }
+ case "JSON":
+ // For JSON schema, encode as JSON
+ encodedMessage, err = p.generateJSONMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as JSON for topic %s: %w", topic, err)
+ }
+ case "PROTOBUF":
+ // For PROTOBUF schema, encode as Protobuf binary
+ encodedMessage, err = p.generateProtobufMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as Protobuf for topic %s: %w", topic, err)
+ }
+ default:
+ // Unknown format - fallback to JSON
+ encodedMessage, err = p.generateJSONMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as JSON (unknown format fallback) for topic %s: %w", topic, err)
+ }
+ }
+
+ // Wrap in Confluent wire format (magic byte + schema ID + payload)
+ messageValue = p.createConfluentWireFormat(schemaID, encodedMessage)
+ } else {
+ // No schemas - generate message based on config value_type
+ var err error
+ messageValue, err = p.generateMessage()
+ if err != nil {
+ return fmt.Errorf("failed to generate message: %w", err)
+ }
+ }
+
+ msg := &sarama.ProducerMessage{
+ Topic: topic,
+ Key: sarama.StringEncoder(key),
+ Value: sarama.ByteEncoder(messageValue),
+ }
+
+ // Add headers if configured
+ if p.config.Producers.IncludeHeaders {
+ msg.Headers = []sarama.RecordHeader{
+ {Key: []byte("producer_id"), Value: []byte(fmt.Sprintf("%d", p.id))},
+ {Key: []byte("timestamp"), Value: []byte(fmt.Sprintf("%d", startTime.UnixNano()))},
+ }
+ }
+
+ // Produce message
+ _, _, err := p.saramaProducer.SendMessage(msg)
+ if err != nil {
+ return err
+ }
+
+ // Record metrics
+ latency := time.Since(startTime)
+ p.metricsCollector.RecordProducedMessage(len(messageValue), latency)
+
+ return nil
+}
+
+// generateMessage generates a test message
+func (p *Producer) generateMessage() ([]byte, error) {
+ p.messageCounter++
+
+ switch p.config.Producers.ValueType {
+ case "avro":
+ return p.generateAvroMessage()
+ case "json":
+ return p.generateJSONMessage()
+ case "binary":
+ return p.generateBinaryMessage()
+ default:
+ return p.generateJSONMessage()
+ }
+}
+
+// generateJSONMessage generates a JSON test message
+func (p *Producer) generateJSONMessage() ([]byte, error) {
+ msg := Message{
+ ID: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter),
+ Timestamp: time.Now().UnixNano(),
+ ProducerID: p.id,
+ Counter: p.messageCounter,
+ UserID: fmt.Sprintf("user-%d", p.random.Intn(10000)),
+ EventType: p.randomEventType(),
+ Properties: map[string]interface{}{
+ "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)),
+ "page_views": fmt.Sprintf("%d", p.random.Intn(100)), // String for Avro map<string,string>
+ "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), // String for Avro map<string,string>
+ "country": p.randomCountry(),
+ "device_type": p.randomDeviceType(),
+ "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)),
+ },
+ }
+
+ // Marshal to JSON (no padding - let natural message size be used)
+ messageBytes, err := json.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+
+ return messageBytes, nil
+}
+
+// generateProtobufMessage generates a Protobuf-encoded message
+func (p *Producer) generateProtobufMessage() ([]byte, error) {
+ // Create protobuf message
+ protoMsg := &pb.LoadTestMessage{
+ Id: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter),
+ Timestamp: time.Now().UnixNano(),
+ ProducerId: int32(p.id),
+ Counter: p.messageCounter,
+ UserId: fmt.Sprintf("user-%d", p.random.Intn(10000)),
+ EventType: p.randomEventType(),
+ Properties: map[string]string{
+ "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)),
+ "page_views": fmt.Sprintf("%d", p.random.Intn(100)),
+ "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)),
+ "country": p.randomCountry(),
+ "device_type": p.randomDeviceType(),
+ "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)),
+ },
+ }
+
+ // Marshal to protobuf binary
+ messageBytes, err := proto.Marshal(protoMsg)
+ if err != nil {
+ return nil, err
+ }
+
+ return messageBytes, nil
+}
+
+// generateAvroMessage generates an Avro-encoded message with Confluent Wire Format
+// NOTE: Avro messages are NOT padded - they have their own binary format
+func (p *Producer) generateAvroMessage() ([]byte, error) {
+ if p.avroCodec == nil {
+ return nil, fmt.Errorf("Avro codec not initialized")
+ }
+
+ // Create Avro-compatible record matching the LoadTestMessage schema
+ record := map[string]interface{}{
+ "id": fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter),
+ "timestamp": time.Now().UnixNano(),
+ "producer_id": p.id,
+ "counter": p.messageCounter,
+ "user_id": fmt.Sprintf("user-%d", p.random.Intn(10000)),
+ "event_type": p.randomEventType(),
+ "properties": map[string]interface{}{
+ "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)),
+ "page_views": fmt.Sprintf("%d", p.random.Intn(100)),
+ "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)),
+ "country": p.randomCountry(),
+ "device_type": p.randomDeviceType(),
+ "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)),
+ },
+ }
+
+ // Encode to Avro binary
+ avroBytes, err := p.avroCodec.BinaryFromNative(nil, record)
+ if err != nil {
+ return nil, err
+ }
+
+ return avroBytes, nil
+}
+
+// generateBinaryMessage generates a binary test message (no padding)
+func (p *Producer) generateBinaryMessage() ([]byte, error) {
+ // Create a simple binary message format:
+ // [producer_id:4][counter:8][timestamp:8]
+ message := make([]byte, 20)
+
+ // Producer ID (4 bytes)
+ message[0] = byte(p.id >> 24)
+ message[1] = byte(p.id >> 16)
+ message[2] = byte(p.id >> 8)
+ message[3] = byte(p.id)
+
+ // Counter (8 bytes)
+ for i := 0; i < 8; i++ {
+ message[4+i] = byte(p.messageCounter >> (56 - i*8))
+ }
+
+ // Timestamp (8 bytes)
+ timestamp := time.Now().UnixNano()
+ for i := 0; i < 8; i++ {
+ message[12+i] = byte(timestamp >> (56 - i*8))
+ }
+
+ return message, nil
+}
+
+// generateMessageKey generates a message key based on the configured distribution
+// Keys are prefixed with a test run ID to track messages across test runs
+func (p *Producer) generateMessageKey() string {
+ // Use test start time as run ID (format: YYYYMMDD-HHMMSS)
+ runID := p.startTime.Format("20060102-150405")
+
+ switch p.config.Producers.KeyDistribution {
+ case "sequential":
+ return fmt.Sprintf("run-%s-key-%d", runID, p.messageCounter)
+ case "uuid":
+ return fmt.Sprintf("run-%s-uuid-%d-%d-%d", runID, p.id, time.Now().UnixNano(), p.random.Intn(1000000))
+ default: // random
+ return fmt.Sprintf("run-%s-key-%d", runID, p.random.Intn(10000))
+ }
+}
+
+// createTopics creates the test topics if they don't exist
+func (p *Producer) createTopics() error {
+ // Use Sarama admin client to create topics
+ config := sarama.NewConfig()
+ config.Version = sarama.V2_8_0_0
+
+ admin, err := sarama.NewClusterAdmin(p.config.Kafka.BootstrapServers, config)
+ if err != nil {
+ return fmt.Errorf("failed to create admin client: %w", err)
+ }
+ defer admin.Close()
+
+ // Create topic specifications
+ topicSpecs := make(map[string]*sarama.TopicDetail)
+ for _, topic := range p.topics {
+ topicSpecs[topic] = &sarama.TopicDetail{
+ NumPartitions: int32(p.config.Topics.Partitions),
+ ReplicationFactor: int16(p.config.Topics.ReplicationFactor),
+ ConfigEntries: map[string]*string{
+ "cleanup.policy": &p.config.Topics.CleanupPolicy,
+ "retention.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.RetentionMs)),
+ "segment.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.SegmentMs)),
+ },
+ }
+ }
+
+ // Create topics
+ for _, topic := range p.topics {
+ err = admin.CreateTopic(topic, topicSpecs[topic], false)
+ if err != nil && err != sarama.ErrTopicAlreadyExists {
+ log.Printf("Producer %d: Warning - failed to create topic %s: %v", p.id, topic, err)
+ } else {
+ log.Printf("Producer %d: Successfully created topic %s", p.id, topic)
+ }
+ }
+
+ return nil
+}
+
+// Close closes the producer and cleans up resources
+func (p *Producer) Close() error {
+ log.Printf("Producer %d: Closing", p.id)
+
+ if p.rateLimiter != nil {
+ p.rateLimiter.Stop()
+ }
+
+ if p.saramaProducer != nil {
+ return p.saramaProducer.Close()
+ }
+
+ return nil
+}
+
+// Helper functions
+
+func stringPtr(s string) *string {
+ return &s
+}
+
+func joinStrings(strs []string, sep string) string {
+ if len(strs) == 0 {
+ return ""
+ }
+
+ result := strs[0]
+ for i := 1; i < len(strs); i++ {
+ result += sep + strs[i]
+ }
+ return result
+}
+
+func (p *Producer) randomEventType() string {
+ events := []string{"login", "logout", "view", "click", "purchase", "signup", "search", "download"}
+ return events[p.random.Intn(len(events))]
+}
+
+func (p *Producer) randomCountry() string {
+ countries := []string{"US", "CA", "UK", "DE", "FR", "JP", "AU", "BR", "IN", "CN"}
+ return countries[p.random.Intn(len(countries))]
+}
+
+func (p *Producer) randomDeviceType() string {
+ devices := []string{"desktop", "mobile", "tablet", "tv", "watch"}
+ return devices[p.random.Intn(len(devices))]
+}
+
+// fetchSchemaIDs fetches schema IDs from Schema Registry for all topics
+func (p *Producer) fetchSchemaIDs() error {
+ for _, topic := range p.topics {
+ subject := topic + "-value"
+ schemaID, err := p.getSchemaID(subject)
+ if err != nil {
+ return fmt.Errorf("failed to get schema ID for subject %s: %w", subject, err)
+ }
+ p.schemaIDs[topic] = schemaID
+ log.Printf("Producer %d: Fetched schema ID %d for topic %s", p.id, schemaID, topic)
+ }
+ return nil
+}
+
+// getSchemaID fetches the latest schema ID for a subject from Schema Registry
+func (p *Producer) getSchemaID(subject string) (int, error) {
+ url := fmt.Sprintf("%s/subjects/%s/versions/latest", p.config.SchemaRegistry.URL, subject)
+
+ resp, err := http.Get(url)
+ if err != nil {
+ return 0, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ body, _ := io.ReadAll(resp.Body)
+ return 0, fmt.Errorf("failed to get schema: status=%d, body=%s", resp.StatusCode, string(body))
+ }
+
+ var schemaResp struct {
+ ID int `json:"id"`
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil {
+ return 0, err
+ }
+
+ return schemaResp.ID, nil
+}
+
+// ensureSchemasRegistered ensures that schemas are registered for all topics
+// It registers schemas if they don't exist, but doesn't fail if they already do
+func (p *Producer) ensureSchemasRegistered() error {
+ for _, topic := range p.topics {
+ subject := topic + "-value"
+
+ // First check if schema already exists
+ schemaID, err := p.getSchemaID(subject)
+ if err == nil {
+ log.Printf("Producer %d: Schema already exists for topic %s (ID: %d), skipping registration", p.id, topic, schemaID)
+ continue
+ }
+
+ // Schema doesn't exist, register it
+ log.Printf("Producer %d: Registering schema for topic %s", p.id, topic)
+ if err := p.registerTopicSchema(subject); err != nil {
+ return fmt.Errorf("failed to register schema for topic %s: %w", topic, err)
+ }
+ log.Printf("Producer %d: Schema registered successfully for topic %s", p.id, topic)
+ }
+ return nil
+}
+
+// registerTopicSchema registers the schema for a specific topic based on configured format
+func (p *Producer) registerTopicSchema(subject string) error {
+ // Extract topic name from subject (remove -value or -key suffix)
+ topicName := strings.TrimSuffix(strings.TrimSuffix(subject, "-value"), "-key")
+
+ // Get schema format for this topic
+ schemaFormat, ok := p.schemaFormats[topicName]
+ if !ok {
+ // Fallback to config or default
+ schemaFormat = p.config.Producers.SchemaFormat
+ if schemaFormat == "" {
+ schemaFormat = "AVRO"
+ }
+ }
+
+ var schemaStr string
+ var schemaType string
+
+ switch strings.ToUpper(schemaFormat) {
+ case "AVRO":
+ schemaStr = schema.GetAvroSchema()
+ schemaType = "AVRO"
+ case "JSON", "JSON_SCHEMA":
+ schemaStr = schema.GetJSONSchema()
+ schemaType = "JSON"
+ case "PROTOBUF":
+ schemaStr = schema.GetProtobufSchema()
+ schemaType = "PROTOBUF"
+ default:
+ return fmt.Errorf("unsupported schema format: %s", schemaFormat)
+ }
+
+ url := fmt.Sprintf("%s/subjects/%s/versions", p.config.SchemaRegistry.URL, subject)
+
+ payload := map[string]interface{}{
+ "schema": schemaStr,
+ "schemaType": schemaType,
+ }
+
+ jsonPayload, err := json.Marshal(payload)
+ if err != nil {
+ return fmt.Errorf("failed to marshal schema payload: %w", err)
+ }
+
+ resp, err := http.Post(url, "application/vnd.schemaregistry.v1+json", strings.NewReader(string(jsonPayload)))
+ if err != nil {
+ return fmt.Errorf("failed to register schema: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("schema registration failed: status=%d, body=%s", resp.StatusCode, string(body))
+ }
+
+ var registerResp struct {
+ ID int `json:"id"`
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&registerResp); err != nil {
+ return fmt.Errorf("failed to decode registration response: %w", err)
+ }
+
+ log.Printf("Schema registered with ID: %d (format: %s)", registerResp.ID, schemaType)
+ return nil
+}
+
+// createConfluentWireFormat creates a message in Confluent Wire Format
+// This matches the implementation in weed/mq/kafka/schema/envelope.go CreateConfluentEnvelope
+func (p *Producer) createConfluentWireFormat(schemaID int, avroData []byte) []byte {
+ // Confluent Wire Format: [magic_byte(1)][schema_id(4)][payload(n)]
+ // magic_byte = 0x00
+ // schema_id = 4 bytes big-endian
+ wireFormat := make([]byte, 5+len(avroData))
+ wireFormat[0] = 0x00 // Magic byte
+ binary.BigEndian.PutUint32(wireFormat[1:5], uint32(schemaID))
+ copy(wireFormat[5:], avroData)
+ return wireFormat
+}
+
+// isCircuitBreakerError checks if an error indicates that the circuit breaker is open
+func (p *Producer) isCircuitBreakerError(err error) bool {
+ return errors.Is(err, ErrCircuitBreakerOpen)
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto
new file mode 100644
index 000000000..dfe00b72f
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+
+package com.seaweedfs.loadtest;
+
+option go_package = "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb";
+
+message LoadTestMessage {
+ string id = 1;
+ int64 timestamp = 2;
+ int32 producer_id = 3;
+ int64 counter = 4;
+ string user_id = 5;
+ string event_type = 6;
+ map<string, string> properties = 7;
+}
+
diff --git a/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go
new file mode 100644
index 000000000..3ed58aa9e
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go
@@ -0,0 +1,185 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.36.6
+// protoc v5.29.3
+// source: loadtest.proto
+
+package pb
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type LoadTestMessage struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+ Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
+ ProducerId int32 `protobuf:"varint,3,opt,name=producer_id,json=producerId,proto3" json:"producer_id,omitempty"`
+ Counter int64 `protobuf:"varint,4,opt,name=counter,proto3" json:"counter,omitempty"`
+ UserId string `protobuf:"bytes,5,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
+ EventType string `protobuf:"bytes,6,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
+ Properties map[string]string `protobuf:"bytes,7,rep,name=properties,proto3" json:"properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *LoadTestMessage) Reset() {
+ *x = LoadTestMessage{}
+ mi := &file_loadtest_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *LoadTestMessage) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*LoadTestMessage) ProtoMessage() {}
+
+func (x *LoadTestMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_loadtest_proto_msgTypes[0]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use LoadTestMessage.ProtoReflect.Descriptor instead.
+func (*LoadTestMessage) Descriptor() ([]byte, []int) {
+ return file_loadtest_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *LoadTestMessage) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+func (x *LoadTestMessage) GetTimestamp() int64 {
+ if x != nil {
+ return x.Timestamp
+ }
+ return 0
+}
+
+func (x *LoadTestMessage) GetProducerId() int32 {
+ if x != nil {
+ return x.ProducerId
+ }
+ return 0
+}
+
+func (x *LoadTestMessage) GetCounter() int64 {
+ if x != nil {
+ return x.Counter
+ }
+ return 0
+}
+
+func (x *LoadTestMessage) GetUserId() string {
+ if x != nil {
+ return x.UserId
+ }
+ return ""
+}
+
+func (x *LoadTestMessage) GetEventType() string {
+ if x != nil {
+ return x.EventType
+ }
+ return ""
+}
+
+func (x *LoadTestMessage) GetProperties() map[string]string {
+ if x != nil {
+ return x.Properties
+ }
+ return nil
+}
+
+var File_loadtest_proto protoreflect.FileDescriptor
+
+const file_loadtest_proto_rawDesc = "" +
+ "\n" +
+ "\x0eloadtest.proto\x12\x16com.seaweedfs.loadtest\"\xca\x02\n" +
+ "\x0fLoadTestMessage\x12\x0e\n" +
+ "\x02id\x18\x01 \x01(\tR\x02id\x12\x1c\n" +
+ "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12\x1f\n" +
+ "\vproducer_id\x18\x03 \x01(\x05R\n" +
+ "producerId\x12\x18\n" +
+ "\acounter\x18\x04 \x01(\x03R\acounter\x12\x17\n" +
+ "\auser_id\x18\x05 \x01(\tR\x06userId\x12\x1d\n" +
+ "\n" +
+ "event_type\x18\x06 \x01(\tR\teventType\x12W\n" +
+ "\n" +
+ "properties\x18\a \x03(\v27.com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntryR\n" +
+ "properties\x1a=\n" +
+ "\x0fPropertiesEntry\x12\x10\n" +
+ "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
+ "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01BTZRgithub.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pbb\x06proto3"
+
+var (
+ file_loadtest_proto_rawDescOnce sync.Once
+ file_loadtest_proto_rawDescData []byte
+)
+
+func file_loadtest_proto_rawDescGZIP() []byte {
+ file_loadtest_proto_rawDescOnce.Do(func() {
+ file_loadtest_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc)))
+ })
+ return file_loadtest_proto_rawDescData
+}
+
+var file_loadtest_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_loadtest_proto_goTypes = []any{
+ (*LoadTestMessage)(nil), // 0: com.seaweedfs.loadtest.LoadTestMessage
+ nil, // 1: com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry
+}
+var file_loadtest_proto_depIdxs = []int32{
+ 1, // 0: com.seaweedfs.loadtest.LoadTestMessage.properties:type_name -> com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry
+ 1, // [1:1] is the sub-list for method output_type
+ 1, // [1:1] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_loadtest_proto_init() }
+func file_loadtest_proto_init() {
+ if File_loadtest_proto != nil {
+ return
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc)),
+ NumEnums: 0,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_loadtest_proto_goTypes,
+ DependencyIndexes: file_loadtest_proto_depIdxs,
+ MessageInfos: file_loadtest_proto_msgTypes,
+ }.Build()
+ File_loadtest_proto = out.File
+ file_loadtest_proto_goTypes = nil
+ file_loadtest_proto_depIdxs = nil
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/schema/schemas.go b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go
new file mode 100644
index 000000000..011b28ef2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go
@@ -0,0 +1,58 @@
+package schema
+
+// GetAvroSchema returns the Avro schema for load test messages
+func GetAvroSchema() string {
+ return `{
+ "type": "record",
+ "name": "LoadTestMessage",
+ "namespace": "com.seaweedfs.loadtest",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "producer_id", "type": "int"},
+ {"name": "counter", "type": "long"},
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }`
+}
+
+// GetJSONSchema returns the JSON Schema for load test messages
+func GetJSONSchema() string {
+ return `{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "LoadTestMessage",
+ "type": "object",
+ "properties": {
+ "id": {"type": "string"},
+ "timestamp": {"type": "integer"},
+ "producer_id": {"type": "integer"},
+ "counter": {"type": "integer"},
+ "user_id": {"type": "string"},
+ "event_type": {"type": "string"},
+ "properties": {
+ "type": "object",
+ "additionalProperties": {"type": "string"}
+ }
+ },
+ "required": ["id", "timestamp", "producer_id", "counter", "user_id", "event_type"]
+ }`
+}
+
+// GetProtobufSchema returns the Protobuf schema for load test messages
+func GetProtobufSchema() string {
+ return `syntax = "proto3";
+
+package com.seaweedfs.loadtest;
+
+message LoadTestMessage {
+ string id = 1;
+ int64 timestamp = 2;
+ int32 producer_id = 3;
+ int64 counter = 4;
+ string user_id = 5;
+ string event_type = 6;
+ map<string, string> properties = 7;
+}`
+}
diff --git a/test/kafka/kafka-client-loadtest/loadtest b/test/kafka/kafka-client-loadtest/loadtest
new file mode 100755
index 000000000..e5a23f173
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/loadtest
Binary files differ
diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/kafka-loadtest.json b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/kafka-loadtest.json
new file mode 100644
index 000000000..3ea04fb68
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/kafka-loadtest.json
@@ -0,0 +1,106 @@
+{
+ "dashboard": {
+ "id": null,
+ "title": "Kafka Client Load Test Dashboard",
+ "tags": ["kafka", "loadtest", "seaweedfs"],
+ "timezone": "browser",
+ "panels": [
+ {
+ "id": 1,
+ "title": "Messages Produced/Consumed",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_messages_produced_total[5m])",
+ "legendFormat": "Produced/sec"
+ },
+ {
+ "expr": "rate(kafka_loadtest_messages_consumed_total[5m])",
+ "legendFormat": "Consumed/sec"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
+ },
+ {
+ "id": 2,
+ "title": "Message Latency",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "histogram_quantile(0.95, kafka_loadtest_message_latency_seconds)",
+ "legendFormat": "95th percentile"
+ },
+ {
+ "expr": "histogram_quantile(0.99, kafka_loadtest_message_latency_seconds)",
+ "legendFormat": "99th percentile"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
+ },
+ {
+ "id": 3,
+ "title": "Error Rates",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_producer_errors_total[5m])",
+ "legendFormat": "Producer Errors/sec"
+ },
+ {
+ "expr": "rate(kafka_loadtest_consumer_errors_total[5m])",
+ "legendFormat": "Consumer Errors/sec"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
+ },
+ {
+ "id": 4,
+ "title": "Throughput (MB/s)",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_bytes_produced_total[5m]) / 1024 / 1024",
+ "legendFormat": "Produced MB/s"
+ },
+ {
+ "expr": "rate(kafka_loadtest_bytes_consumed_total[5m]) / 1024 / 1024",
+ "legendFormat": "Consumed MB/s"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 0, "y": 16}
+ },
+ {
+ "id": 5,
+ "title": "Active Clients",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "kafka_loadtest_active_producers",
+ "legendFormat": "Producers"
+ },
+ {
+ "expr": "kafka_loadtest_active_consumers",
+ "legendFormat": "Consumers"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 12, "y": 16}
+ },
+ {
+ "id": 6,
+ "title": "Consumer Lag",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "kafka_loadtest_consumer_lag_messages",
+ "legendFormat": "{{consumer_group}}-{{topic}}-{{partition}}"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 24, "x": 0, "y": 24}
+ }
+ ],
+ "time": {"from": "now-30m", "to": "now"},
+ "refresh": "5s",
+ "schemaVersion": 16,
+ "version": 0
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/seaweedfs.json b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/seaweedfs.json
new file mode 100644
index 000000000..4c2261f22
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/dashboards/seaweedfs.json
@@ -0,0 +1,62 @@
+{
+ "dashboard": {
+ "id": null,
+ "title": "SeaweedFS Cluster Dashboard",
+ "tags": ["seaweedfs", "storage"],
+ "timezone": "browser",
+ "panels": [
+ {
+ "id": 1,
+ "title": "Master Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-master\"}",
+ "legendFormat": "Master Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 0, "y": 0}
+ },
+ {
+ "id": 2,
+ "title": "Volume Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-volume\"}",
+ "legendFormat": "Volume Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 6, "y": 0}
+ },
+ {
+ "id": 3,
+ "title": "Filer Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-filer\"}",
+ "legendFormat": "Filer Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 12, "y": 0}
+ },
+ {
+ "id": 4,
+ "title": "MQ Broker Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-mq-broker\"}",
+ "legendFormat": "MQ Broker Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 18, "y": 0}
+ }
+ ],
+ "time": {"from": "now-30m", "to": "now"},
+ "refresh": "10s",
+ "schemaVersion": 16,
+ "version": 0
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/dashboards/dashboard.yml b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/dashboards/dashboard.yml
new file mode 100644
index 000000000..0bcf3d818
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/dashboards/dashboard.yml
@@ -0,0 +1,11 @@
+apiVersion: 1
+
+providers:
+ - name: 'default'
+ orgId: 1
+ folder: ''
+ type: file
+ disableDeletion: false
+ editable: true
+ options:
+ path: /var/lib/grafana/dashboards
diff --git a/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/datasources/datasource.yml b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/datasources/datasource.yml
new file mode 100644
index 000000000..fb78be722
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/monitoring/grafana/provisioning/datasources/datasource.yml
@@ -0,0 +1,12 @@
+apiVersion: 1
+
+datasources:
+ - name: Prometheus
+ type: prometheus
+ access: proxy
+ orgId: 1
+ url: http://prometheus:9090
+ basicAuth: false
+ isDefault: true
+ editable: true
+ version: 1
diff --git a/test/kafka/kafka-client-loadtest/monitoring/prometheus/prometheus.yml b/test/kafka/kafka-client-loadtest/monitoring/prometheus/prometheus.yml
new file mode 100644
index 000000000..f62091d52
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/monitoring/prometheus/prometheus.yml
@@ -0,0 +1,54 @@
+# Prometheus configuration for Kafka Load Test monitoring
+
+global:
+ scrape_interval: 15s
+ evaluation_interval: 15s
+
+rule_files:
+ # - "first_rules.yml"
+ # - "second_rules.yml"
+
+scrape_configs:
+ # Scrape Prometheus itself
+ - job_name: 'prometheus'
+ static_configs:
+ - targets: ['localhost:9090']
+
+ # Scrape load test metrics
+ - job_name: 'kafka-loadtest'
+ static_configs:
+ - targets: ['kafka-client-loadtest-runner:8080']
+ scrape_interval: 5s
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Master metrics
+ - job_name: 'seaweedfs-master'
+ static_configs:
+ - targets: ['seaweedfs-master:9333']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Volume metrics
+ - job_name: 'seaweedfs-volume'
+ static_configs:
+ - targets: ['seaweedfs-volume:8080']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Filer metrics
+ - job_name: 'seaweedfs-filer'
+ static_configs:
+ - targets: ['seaweedfs-filer:8888']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS MQ Broker metrics (if available)
+ - job_name: 'seaweedfs-mq-broker'
+ static_configs:
+ - targets: ['seaweedfs-mq-broker:17777']
+ metrics_path: '/metrics'
+ scrape_interval: 10s
+
+ # Scrape Kafka Gateway metrics (if available)
+ - job_name: 'kafka-gateway'
+ static_configs:
+ - targets: ['kafka-gateway:9093']
+ metrics_path: '/metrics'
+ scrape_interval: 10s
diff --git a/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh b/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh
new file mode 100755
index 000000000..58cb0f114
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh
@@ -0,0 +1,423 @@
+#!/bin/bash
+
+# Register schemas with Schema Registry for load testing
+# This script registers the necessary schemas before running load tests
+
+set -euo pipefail
+
+# Colors
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+log_warning() {
+ echo -e "${YELLOW}[WARN]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+# Configuration
+SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"}
+TIMEOUT=${TIMEOUT:-60}
+CHECK_INTERVAL=${CHECK_INTERVAL:-2}
+
+# Wait for Schema Registry to be ready
+wait_for_schema_registry() {
+ log_info "Waiting for Schema Registry to be ready..."
+
+ local elapsed=0
+ while [[ $elapsed -lt $TIMEOUT ]]; do
+ if curl -sf --max-time 5 "$SCHEMA_REGISTRY_URL/subjects" >/dev/null 2>&1; then
+ log_success "Schema Registry is ready!"
+ return 0
+ fi
+
+ log_info "Schema Registry not ready yet. Waiting ${CHECK_INTERVAL}s... (${elapsed}/${TIMEOUT}s)"
+ sleep $CHECK_INTERVAL
+ elapsed=$((elapsed + CHECK_INTERVAL))
+ done
+
+ log_error "Schema Registry did not become ready within ${TIMEOUT} seconds"
+ return 1
+}
+
+# Register a schema for a subject
+register_schema() {
+ local subject=$1
+ local schema=$2
+ local schema_type=${3:-"AVRO"}
+ local max_attempts=5
+ local attempt=1
+
+ log_info "Registering schema for subject: $subject"
+
+ # Create the schema registration payload
+ local escaped_schema=$(echo "$schema" | jq -Rs .)
+ local payload=$(cat <<EOF
+{
+ "schema": $escaped_schema,
+ "schemaType": "$schema_type"
+}
+EOF
+)
+
+ while [[ $attempt -le $max_attempts ]]; do
+ # Register the schema (with 30 second timeout)
+ local response
+ response=$(curl -s --max-time 30 -X POST \
+ -H "Content-Type: application/vnd.schemaregistry.v1+json" \
+ -d "$payload" \
+ "$SCHEMA_REGISTRY_URL/subjects/$subject/versions" 2>/dev/null)
+
+ if echo "$response" | jq -e '.id' >/dev/null 2>&1; then
+ local schema_id
+ schema_id=$(echo "$response" | jq -r '.id')
+ if [[ $attempt -gt 1 ]]; then
+ log_success "- Schema registered for $subject with ID: $schema_id [attempt $attempt]"
+ else
+ log_success "- Schema registered for $subject with ID: $schema_id"
+ fi
+ return 0
+ fi
+
+ # Check if it's a consumer lag timeout (error_code 50002)
+ local error_code
+ error_code=$(echo "$response" | jq -r '.error_code // empty' 2>/dev/null)
+
+ if [[ "$error_code" == "50002" && $attempt -lt $max_attempts ]]; then
+ # Consumer lag timeout - wait longer for consumer to catch up
+ # Use exponential backoff: 1s, 2s, 4s, 8s
+ local wait_time=$(echo "2 ^ ($attempt - 1)" | bc)
+ log_warning "Schema Registry consumer lag detected for $subject, waiting ${wait_time}s before retry (attempt $attempt)..."
+ sleep "$wait_time"
+ attempt=$((attempt + 1))
+ else
+ # Other error or max attempts reached
+ log_error "x Failed to register schema for $subject"
+ log_error "Response: $response"
+ return 1
+ fi
+ done
+
+ return 1
+}
+
+# Verify a schema exists (single attempt)
+verify_schema() {
+ local subject=$1
+
+ local response
+ response=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects/$subject/versions/latest" 2>/dev/null)
+
+ if echo "$response" | jq -e '.id' >/dev/null 2>&1; then
+ local schema_id
+ local version
+ schema_id=$(echo "$response" | jq -r '.id')
+ version=$(echo "$response" | jq -r '.version')
+ log_success "- Schema verified for $subject (ID: $schema_id, Version: $version)"
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Verify a schema exists with retry logic (handles Schema Registry consumer lag)
+verify_schema_with_retry() {
+ local subject=$1
+ local max_attempts=10
+ local attempt=1
+
+ log_info "Verifying schema for subject: $subject"
+
+ while [[ $attempt -le $max_attempts ]]; do
+ local response
+ response=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects/$subject/versions/latest" 2>/dev/null)
+
+ if echo "$response" | jq -e '.id' >/dev/null 2>&1; then
+ local schema_id
+ local version
+ schema_id=$(echo "$response" | jq -r '.id')
+ version=$(echo "$response" | jq -r '.version')
+
+ if [[ $attempt -gt 1 ]]; then
+ log_success "- Schema verified for $subject (ID: $schema_id, Version: $version) [attempt $attempt]"
+ else
+ log_success "- Schema verified for $subject (ID: $schema_id, Version: $version)"
+ fi
+ return 0
+ fi
+
+ # Schema not found, wait and retry (handles Schema Registry consumer lag)
+ if [[ $attempt -lt $max_attempts ]]; then
+ # Longer exponential backoff for Schema Registry consumer lag: 0.5s, 1s, 2s, 3s, 4s...
+ local wait_time=$(echo "scale=1; 0.5 * $attempt" | bc)
+ sleep "$wait_time"
+ attempt=$((attempt + 1))
+ else
+ log_error "x Schema not found for $subject (tried $max_attempts times)"
+ return 1
+ fi
+ done
+
+ return 1
+}
+
+# Register load test schemas (optimized for batch registration)
+register_loadtest_schemas() {
+ log_info "Registering load test schemas with multiple formats..."
+
+ # Define the Avro schema for load test messages
+ local avro_value_schema='{
+ "type": "record",
+ "name": "LoadTestMessage",
+ "namespace": "com.seaweedfs.loadtest",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "producer_id", "type": "int"},
+ {"name": "counter", "type": "long"},
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }'
+
+ # Define the JSON schema for load test messages
+ local json_value_schema='{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "LoadTestMessage",
+ "type": "object",
+ "properties": {
+ "id": {"type": "string"},
+ "timestamp": {"type": "integer"},
+ "producer_id": {"type": "integer"},
+ "counter": {"type": "integer"},
+ "user_id": {"type": "string"},
+ "event_type": {"type": "string"},
+ "properties": {
+ "type": "object",
+ "additionalProperties": {"type": "string"}
+ }
+ },
+ "required": ["id", "timestamp", "producer_id", "counter", "user_id", "event_type"]
+ }'
+
+ # Define the Protobuf schema for load test messages
+ local protobuf_value_schema='syntax = "proto3";
+
+package com.seaweedfs.loadtest;
+
+message LoadTestMessage {
+ string id = 1;
+ int64 timestamp = 2;
+ int32 producer_id = 3;
+ int64 counter = 4;
+ string user_id = 5;
+ string event_type = 6;
+ map<string, string> properties = 7;
+}'
+
+ # Define the key schema (simple string)
+ local avro_key_schema='{"type": "string"}'
+ local json_key_schema='{"type": "string"}'
+ local protobuf_key_schema='syntax = "proto3"; message Key { string key = 1; }'
+
+ # Register schemas for all load test topics with different formats
+ local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4")
+ local success_count=0
+ local total_schemas=0
+
+ # Distribute formats: topic-0=AVRO, topic-1=JSON, topic-2=PROTOBUF, topic-3=AVRO, topic-4=JSON
+ local idx=0
+ for topic in "${topics[@]}"; do
+ local format
+ local value_schema
+ local key_schema
+
+ # Determine format based on topic index (same as producer logic)
+ case $((idx % 3)) in
+ 0)
+ format="AVRO"
+ value_schema="$avro_value_schema"
+ key_schema="$avro_key_schema"
+ ;;
+ 1)
+ format="JSON"
+ value_schema="$json_value_schema"
+ key_schema="$json_key_schema"
+ ;;
+ 2)
+ format="PROTOBUF"
+ value_schema="$protobuf_value_schema"
+ key_schema="$protobuf_key_schema"
+ ;;
+ esac
+
+ log_info "Registering $topic with $format schema..."
+
+ # Register value schema
+ if register_schema "${topic}-value" "$value_schema" "$format"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+
+ # Small delay to let Schema Registry consumer process (prevents consumer lag)
+ sleep 0.2
+
+ # Register key schema
+ if register_schema "${topic}-key" "$key_schema" "$format"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+
+ # Small delay to let Schema Registry consumer process (prevents consumer lag)
+ sleep 0.2
+
+ idx=$((idx + 1))
+ done
+
+ log_info "Schema registration summary: $success_count/$total_schemas schemas registered successfully"
+ log_info "Format distribution: topic-0=AVRO, topic-1=JSON, topic-2=PROTOBUF, topic-3=AVRO, topic-4=JSON"
+
+ if [[ $success_count -eq $total_schemas ]]; then
+ log_success "All load test schemas registered successfully with multiple formats!"
+ return 0
+ else
+ log_error "Some schemas failed to register"
+ return 1
+ fi
+}
+
+# Verify all schemas are registered
+verify_loadtest_schemas() {
+ log_info "Verifying load test schemas..."
+
+ local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4")
+ local success_count=0
+ local total_schemas=0
+
+ for topic in "${topics[@]}"; do
+ # Verify value schema with retry (handles Schema Registry consumer lag)
+ if verify_schema_with_retry "${topic}-value"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+
+ # Verify key schema with retry (handles Schema Registry consumer lag)
+ if verify_schema_with_retry "${topic}-key"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+ done
+
+ log_info "Schema verification summary: $success_count/$total_schemas schemas verified"
+
+ if [[ $success_count -eq $total_schemas ]]; then
+ log_success "All load test schemas verified successfully!"
+ return 0
+ else
+ log_error "Some schemas are missing or invalid"
+ return 1
+ fi
+}
+
+# List all registered subjects
+list_subjects() {
+ log_info "Listing all registered subjects..."
+
+ local subjects
+ subjects=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects" 2>/dev/null)
+
+ if echo "$subjects" | jq -e '.[]' >/dev/null 2>&1; then
+ # Use process substitution instead of pipeline to avoid subshell exit code issues
+ while IFS= read -r subject; do
+ log_info " - $subject"
+ done < <(echo "$subjects" | jq -r '.[]')
+ else
+ log_warning "No subjects found or Schema Registry not accessible"
+ fi
+
+ return 0
+}
+
+# Clean up schemas (for testing)
+cleanup_schemas() {
+ log_warning "Cleaning up load test schemas..."
+
+ local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4")
+
+ for topic in "${topics[@]}"; do
+ # Delete value schema (with timeout)
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-value" >/dev/null 2>&1 || true
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-value?permanent=true" >/dev/null 2>&1 || true
+
+ # Delete key schema (with timeout)
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-key" >/dev/null 2>&1 || true
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-key?permanent=true" >/dev/null 2>&1 || true
+ done
+
+ log_success "Schema cleanup completed"
+}
+
+# Main function
+main() {
+ case "${1:-register}" in
+ "register")
+ wait_for_schema_registry
+ register_loadtest_schemas
+ ;;
+ "verify")
+ wait_for_schema_registry
+ verify_loadtest_schemas
+ ;;
+ "list")
+ wait_for_schema_registry
+ list_subjects
+ ;;
+ "cleanup")
+ wait_for_schema_registry
+ cleanup_schemas
+ ;;
+ "full")
+ wait_for_schema_registry
+ register_loadtest_schemas
+ # Wait for Schema Registry consumer to catch up before verification
+ log_info "Waiting 3 seconds for Schema Registry consumer to process all schemas..."
+ sleep 3
+ verify_loadtest_schemas
+ list_subjects
+ ;;
+ *)
+ echo "Usage: $0 [register|verify|list|cleanup|full]"
+ echo ""
+ echo "Commands:"
+ echo " register - Register load test schemas (default)"
+ echo " verify - Verify schemas are registered"
+ echo " list - List all registered subjects"
+ echo " cleanup - Clean up load test schemas"
+ echo " full - Register, verify, and list schemas"
+ echo ""
+ echo "Environment variables:"
+ echo " SCHEMA_REGISTRY_URL - Schema Registry URL (default: http://localhost:8081)"
+ echo " TIMEOUT - Maximum time to wait for Schema Registry (default: 60)"
+ echo " CHECK_INTERVAL - Check interval in seconds (default: 2)"
+ exit 1
+ ;;
+ esac
+
+ return 0
+}
+
+main "$@"
diff --git a/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh b/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh
new file mode 100755
index 000000000..7f6ddc79a
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh
@@ -0,0 +1,480 @@
+#!/bin/bash
+
+# Kafka Client Load Test Runner Script
+# This script helps run various load test scenarios against SeaweedFS Kafka Gateway
+
+set -euo pipefail
+
+# Default configuration
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
+DOCKER_COMPOSE_FILE="$PROJECT_DIR/docker-compose.yml"
+CONFIG_FILE="$PROJECT_DIR/config/loadtest.yaml"
+
+# Default test parameters
+TEST_MODE="comprehensive"
+TEST_DURATION="300s"
+PRODUCER_COUNT=10
+CONSUMER_COUNT=5
+MESSAGE_RATE=1000
+MESSAGE_SIZE=1024
+TOPIC_COUNT=5
+PARTITIONS_PER_TOPIC=3
+
+# Colors for output
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m' # No Color
+
+# Function to print colored output
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+log_warning() {
+ echo -e "${YELLOW}[WARNING]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+# Function to show usage
+show_usage() {
+ cat << EOF
+Kafka Client Load Test Runner
+
+Usage: $0 [OPTIONS] [COMMAND]
+
+Commands:
+ start Start the load test infrastructure and run tests
+ stop Stop all services
+ restart Restart all services
+ status Show service status
+ logs Show logs from all services
+ clean Clean up all resources (volumes, networks, etc.)
+ monitor Start monitoring stack (Prometheus + Grafana)
+ scenarios Run predefined test scenarios
+
+Options:
+ -m, --mode MODE Test mode: producer, consumer, comprehensive (default: comprehensive)
+ -d, --duration DURATION Test duration (default: 300s)
+ -p, --producers COUNT Number of producers (default: 10)
+ -c, --consumers COUNT Number of consumers (default: 5)
+ -r, --rate RATE Messages per second per producer (default: 1000)
+ -s, --size SIZE Message size in bytes (default: 1024)
+ -t, --topics COUNT Number of topics (default: 5)
+ --partitions COUNT Partitions per topic (default: 3)
+ --config FILE Configuration file (default: config/loadtest.yaml)
+ --monitoring Enable monitoring stack
+ --wait-ready Wait for services to be ready before starting tests
+ -v, --verbose Verbose output
+ -h, --help Show this help message
+
+Examples:
+ # Run comprehensive test for 5 minutes
+ $0 start -m comprehensive -d 5m
+
+ # Run producer-only test with high throughput
+ $0 start -m producer -p 20 -r 2000 -d 10m
+
+ # Run consumer-only test
+ $0 start -m consumer -c 10
+
+ # Run with monitoring
+ $0 start --monitoring -d 15m
+
+ # Clean up everything
+ $0 clean
+
+Predefined Scenarios:
+ quick Quick smoke test (1 min, low load)
+ standard Standard load test (5 min, medium load)
+ stress Stress test (10 min, high load)
+ endurance Endurance test (30 min, sustained load)
+ burst Burst test (variable load)
+
+EOF
+}
+
+# Parse command line arguments
+parse_args() {
+ while [[ $# -gt 0 ]]; do
+ case $1 in
+ -m|--mode)
+ TEST_MODE="$2"
+ shift 2
+ ;;
+ -d|--duration)
+ TEST_DURATION="$2"
+ shift 2
+ ;;
+ -p|--producers)
+ PRODUCER_COUNT="$2"
+ shift 2
+ ;;
+ -c|--consumers)
+ CONSUMER_COUNT="$2"
+ shift 2
+ ;;
+ -r|--rate)
+ MESSAGE_RATE="$2"
+ shift 2
+ ;;
+ -s|--size)
+ MESSAGE_SIZE="$2"
+ shift 2
+ ;;
+ -t|--topics)
+ TOPIC_COUNT="$2"
+ shift 2
+ ;;
+ --partitions)
+ PARTITIONS_PER_TOPIC="$2"
+ shift 2
+ ;;
+ --config)
+ CONFIG_FILE="$2"
+ shift 2
+ ;;
+ --monitoring)
+ ENABLE_MONITORING=1
+ shift
+ ;;
+ --wait-ready)
+ WAIT_READY=1
+ shift
+ ;;
+ -v|--verbose)
+ VERBOSE=1
+ shift
+ ;;
+ -h|--help)
+ show_usage
+ exit 0
+ ;;
+ -*)
+ log_error "Unknown option: $1"
+ show_usage
+ exit 1
+ ;;
+ *)
+ if [[ -z "${COMMAND:-}" ]]; then
+ COMMAND="$1"
+ else
+ log_error "Multiple commands specified"
+ show_usage
+ exit 1
+ fi
+ shift
+ ;;
+ esac
+ done
+}
+
+# Check if Docker and Docker Compose are available
+check_dependencies() {
+ if ! command -v docker &> /dev/null; then
+ log_error "Docker is not installed or not in PATH"
+ exit 1
+ fi
+
+ if ! command -v docker-compose &> /dev/null && ! docker compose version &> /dev/null; then
+ log_error "Docker Compose is not installed or not in PATH"
+ exit 1
+ fi
+
+ # Use docker compose if available, otherwise docker-compose
+ if docker compose version &> /dev/null; then
+ DOCKER_COMPOSE="docker compose"
+ else
+ DOCKER_COMPOSE="docker-compose"
+ fi
+}
+
+# Wait for services to be ready
+wait_for_services() {
+ log_info "Waiting for services to be ready..."
+
+ local timeout=300 # 5 minutes timeout
+ local elapsed=0
+ local check_interval=5
+
+ while [[ $elapsed -lt $timeout ]]; do
+ if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps --format table | grep -q "healthy"; then
+ if check_service_health; then
+ log_success "All services are ready!"
+ return 0
+ fi
+ fi
+
+ sleep $check_interval
+ elapsed=$((elapsed + check_interval))
+ log_info "Waiting... ($elapsed/${timeout}s)"
+ done
+
+ log_error "Services did not become ready within $timeout seconds"
+ return 1
+}
+
+# Check health of critical services
+check_service_health() {
+ # Check Kafka Gateway
+ if ! curl -s http://localhost:9093 >/dev/null 2>&1; then
+ return 1
+ fi
+
+ # Check Schema Registry
+ if ! curl -s http://localhost:8081/subjects >/dev/null 2>&1; then
+ return 1
+ fi
+
+ return 0
+}
+
+# Start the load test infrastructure
+start_services() {
+ log_info "Starting SeaweedFS Kafka load test infrastructure..."
+
+ # Set environment variables
+ export TEST_MODE="$TEST_MODE"
+ export TEST_DURATION="$TEST_DURATION"
+ export PRODUCER_COUNT="$PRODUCER_COUNT"
+ export CONSUMER_COUNT="$CONSUMER_COUNT"
+ export MESSAGE_RATE="$MESSAGE_RATE"
+ export MESSAGE_SIZE="$MESSAGE_SIZE"
+ export TOPIC_COUNT="$TOPIC_COUNT"
+ export PARTITIONS_PER_TOPIC="$PARTITIONS_PER_TOPIC"
+
+ # Start core services
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" up -d \
+ seaweedfs-master \
+ seaweedfs-volume \
+ seaweedfs-filer \
+ seaweedfs-mq-broker \
+ kafka-gateway \
+ schema-registry
+
+ # Start monitoring if enabled
+ if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then
+ log_info "Starting monitoring stack..."
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d
+ fi
+
+ # Wait for services to be ready if requested
+ if [[ "${WAIT_READY:-0}" == "1" ]]; then
+ wait_for_services
+ fi
+
+ log_success "Infrastructure started successfully"
+}
+
+# Run the load test
+run_loadtest() {
+ log_info "Starting Kafka client load test..."
+ log_info "Mode: $TEST_MODE, Duration: $TEST_DURATION"
+ log_info "Producers: $PRODUCER_COUNT, Consumers: $CONSUMER_COUNT"
+ log_info "Message Rate: $MESSAGE_RATE msgs/sec, Size: $MESSAGE_SIZE bytes"
+
+ # Run the load test
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest up --abort-on-container-exit kafka-client-loadtest
+
+ # Show test results
+ show_results
+}
+
+# Show test results
+show_results() {
+ log_info "Load test completed! Gathering results..."
+
+ # Get final metrics from the load test container
+ if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps kafka-client-loadtest-runner &>/dev/null; then
+ log_info "Final test statistics:"
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" exec -T kafka-client-loadtest-runner curl -s http://localhost:8080/stats || true
+ fi
+
+ # Show Prometheus metrics if monitoring is enabled
+ if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then
+ log_info "Monitoring dashboards available at:"
+ log_info " Prometheus: http://localhost:9090"
+ log_info " Grafana: http://localhost:3000 (admin/admin)"
+ fi
+
+ # Show where results are stored
+ if [[ -d "$PROJECT_DIR/test-results" ]]; then
+ log_info "Test results saved to: $PROJECT_DIR/test-results/"
+ fi
+}
+
+# Stop services
+stop_services() {
+ log_info "Stopping all services..."
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down
+ log_success "Services stopped"
+}
+
+# Show service status
+show_status() {
+ log_info "Service status:"
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps
+}
+
+# Show logs
+show_logs() {
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" logs -f "${1:-}"
+}
+
+# Clean up all resources
+clean_all() {
+ log_warning "This will remove all volumes, networks, and containers. Are you sure? (y/N)"
+ read -r response
+ if [[ "$response" =~ ^[Yy]$ ]]; then
+ log_info "Cleaning up all resources..."
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down -v --remove-orphans
+
+ # Remove any remaining volumes
+ docker volume ls -q | grep -E "(kafka-client-loadtest|seaweedfs)" | xargs -r docker volume rm
+
+ # Remove networks
+ docker network ls -q | grep -E "kafka-client-loadtest" | xargs -r docker network rm
+
+ log_success "Cleanup completed"
+ else
+ log_info "Cleanup cancelled"
+ fi
+}
+
+# Run predefined scenarios
+run_scenario() {
+ local scenario="$1"
+
+ case "$scenario" in
+ quick)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="1m"
+ PRODUCER_COUNT=2
+ CONSUMER_COUNT=2
+ MESSAGE_RATE=100
+ MESSAGE_SIZE=512
+ TOPIC_COUNT=2
+ ;;
+ standard)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="5m"
+ PRODUCER_COUNT=5
+ CONSUMER_COUNT=3
+ MESSAGE_RATE=500
+ MESSAGE_SIZE=1024
+ TOPIC_COUNT=3
+ ;;
+ stress)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="10m"
+ PRODUCER_COUNT=20
+ CONSUMER_COUNT=10
+ MESSAGE_RATE=2000
+ MESSAGE_SIZE=2048
+ TOPIC_COUNT=10
+ ;;
+ endurance)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="30m"
+ PRODUCER_COUNT=10
+ CONSUMER_COUNT=5
+ MESSAGE_RATE=1000
+ MESSAGE_SIZE=1024
+ TOPIC_COUNT=5
+ ;;
+ burst)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="10m"
+ PRODUCER_COUNT=10
+ CONSUMER_COUNT=5
+ MESSAGE_RATE=1000
+ MESSAGE_SIZE=1024
+ TOPIC_COUNT=5
+ # Note: Burst behavior would be configured in the load test config
+ ;;
+ *)
+ log_error "Unknown scenario: $scenario"
+ log_info "Available scenarios: quick, standard, stress, endurance, burst"
+ exit 1
+ ;;
+ esac
+
+ log_info "Running $scenario scenario..."
+ start_services
+ if [[ "${WAIT_READY:-0}" == "1" ]]; then
+ wait_for_services
+ fi
+ run_loadtest
+}
+
+# Main execution
+main() {
+ if [[ $# -eq 0 ]]; then
+ show_usage
+ exit 0
+ fi
+
+ parse_args "$@"
+ check_dependencies
+
+ case "${COMMAND:-}" in
+ start)
+ start_services
+ run_loadtest
+ ;;
+ stop)
+ stop_services
+ ;;
+ restart)
+ stop_services
+ start_services
+ ;;
+ status)
+ show_status
+ ;;
+ logs)
+ show_logs
+ ;;
+ clean)
+ clean_all
+ ;;
+ monitor)
+ ENABLE_MONITORING=1
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d
+ log_success "Monitoring stack started"
+ log_info "Prometheus: http://localhost:9090"
+ log_info "Grafana: http://localhost:3000 (admin/admin)"
+ ;;
+ scenarios)
+ if [[ -n "${2:-}" ]]; then
+ run_scenario "$2"
+ else
+ log_error "Please specify a scenario"
+ log_info "Available scenarios: quick, standard, stress, endurance, burst"
+ exit 1
+ fi
+ ;;
+ *)
+ log_error "Unknown command: ${COMMAND:-}"
+ show_usage
+ exit 1
+ ;;
+ esac
+}
+
+# Set default values
+ENABLE_MONITORING=0
+WAIT_READY=0
+VERBOSE=0
+
+# Run main function
+main "$@"
diff --git a/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh b/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh
new file mode 100755
index 000000000..3ea43f998
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh
@@ -0,0 +1,352 @@
+#!/bin/bash
+
+# Setup monitoring for Kafka Client Load Test
+# This script sets up Prometheus and Grafana configurations
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
+MONITORING_DIR="$PROJECT_DIR/monitoring"
+
+# Colors
+GREEN='\033[0;32m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+# Create monitoring directory structure
+setup_directories() {
+ log_info "Setting up monitoring directories..."
+
+ mkdir -p "$MONITORING_DIR/prometheus"
+ mkdir -p "$MONITORING_DIR/grafana/dashboards"
+ mkdir -p "$MONITORING_DIR/grafana/provisioning/dashboards"
+ mkdir -p "$MONITORING_DIR/grafana/provisioning/datasources"
+
+ log_success "Directories created"
+}
+
+# Create Prometheus configuration
+create_prometheus_config() {
+ log_info "Creating Prometheus configuration..."
+
+ cat > "$MONITORING_DIR/prometheus/prometheus.yml" << 'EOF'
+# Prometheus configuration for Kafka Load Test monitoring
+
+global:
+ scrape_interval: 15s
+ evaluation_interval: 15s
+
+rule_files:
+ # - "first_rules.yml"
+ # - "second_rules.yml"
+
+scrape_configs:
+ # Scrape Prometheus itself
+ - job_name: 'prometheus'
+ static_configs:
+ - targets: ['localhost:9090']
+
+ # Scrape load test metrics
+ - job_name: 'kafka-loadtest'
+ static_configs:
+ - targets: ['kafka-client-loadtest-runner:8080']
+ scrape_interval: 5s
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Master metrics
+ - job_name: 'seaweedfs-master'
+ static_configs:
+ - targets: ['seaweedfs-master:9333']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Volume metrics
+ - job_name: 'seaweedfs-volume'
+ static_configs:
+ - targets: ['seaweedfs-volume:8080']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Filer metrics
+ - job_name: 'seaweedfs-filer'
+ static_configs:
+ - targets: ['seaweedfs-filer:8888']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS MQ Broker metrics (if available)
+ - job_name: 'seaweedfs-mq-broker'
+ static_configs:
+ - targets: ['seaweedfs-mq-broker:17777']
+ metrics_path: '/metrics'
+ scrape_interval: 10s
+
+ # Scrape Kafka Gateway metrics (if available)
+ - job_name: 'kafka-gateway'
+ static_configs:
+ - targets: ['kafka-gateway:9093']
+ metrics_path: '/metrics'
+ scrape_interval: 10s
+EOF
+
+ log_success "Prometheus configuration created"
+}
+
+# Create Grafana datasource configuration
+create_grafana_datasource() {
+ log_info "Creating Grafana datasource configuration..."
+
+ cat > "$MONITORING_DIR/grafana/provisioning/datasources/datasource.yml" << 'EOF'
+apiVersion: 1
+
+datasources:
+ - name: Prometheus
+ type: prometheus
+ access: proxy
+ orgId: 1
+ url: http://prometheus:9090
+ basicAuth: false
+ isDefault: true
+ editable: true
+ version: 1
+EOF
+
+ log_success "Grafana datasource configuration created"
+}
+
+# Create Grafana dashboard provisioning
+create_grafana_dashboard_provisioning() {
+ log_info "Creating Grafana dashboard provisioning..."
+
+ cat > "$MONITORING_DIR/grafana/provisioning/dashboards/dashboard.yml" << 'EOF'
+apiVersion: 1
+
+providers:
+ - name: 'default'
+ orgId: 1
+ folder: ''
+ type: file
+ disableDeletion: false
+ editable: true
+ options:
+ path: /var/lib/grafana/dashboards
+EOF
+
+ log_success "Grafana dashboard provisioning created"
+}
+
+# Create Kafka Load Test dashboard
+create_loadtest_dashboard() {
+ log_info "Creating Kafka Load Test Grafana dashboard..."
+
+ cat > "$MONITORING_DIR/grafana/dashboards/kafka-loadtest.json" << 'EOF'
+{
+ "dashboard": {
+ "id": null,
+ "title": "Kafka Client Load Test Dashboard",
+ "tags": ["kafka", "loadtest", "seaweedfs"],
+ "timezone": "browser",
+ "panels": [
+ {
+ "id": 1,
+ "title": "Messages Produced/Consumed",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_messages_produced_total[5m])",
+ "legendFormat": "Produced/sec"
+ },
+ {
+ "expr": "rate(kafka_loadtest_messages_consumed_total[5m])",
+ "legendFormat": "Consumed/sec"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
+ },
+ {
+ "id": 2,
+ "title": "Message Latency",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "histogram_quantile(0.95, kafka_loadtest_message_latency_seconds)",
+ "legendFormat": "95th percentile"
+ },
+ {
+ "expr": "histogram_quantile(0.99, kafka_loadtest_message_latency_seconds)",
+ "legendFormat": "99th percentile"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
+ },
+ {
+ "id": 3,
+ "title": "Error Rates",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_producer_errors_total[5m])",
+ "legendFormat": "Producer Errors/sec"
+ },
+ {
+ "expr": "rate(kafka_loadtest_consumer_errors_total[5m])",
+ "legendFormat": "Consumer Errors/sec"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
+ },
+ {
+ "id": 4,
+ "title": "Throughput (MB/s)",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_bytes_produced_total[5m]) / 1024 / 1024",
+ "legendFormat": "Produced MB/s"
+ },
+ {
+ "expr": "rate(kafka_loadtest_bytes_consumed_total[5m]) / 1024 / 1024",
+ "legendFormat": "Consumed MB/s"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 0, "y": 16}
+ },
+ {
+ "id": 5,
+ "title": "Active Clients",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "kafka_loadtest_active_producers",
+ "legendFormat": "Producers"
+ },
+ {
+ "expr": "kafka_loadtest_active_consumers",
+ "legendFormat": "Consumers"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 12, "y": 16}
+ },
+ {
+ "id": 6,
+ "title": "Consumer Lag",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "kafka_loadtest_consumer_lag_messages",
+ "legendFormat": "{{consumer_group}}-{{topic}}-{{partition}}"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 24, "x": 0, "y": 24}
+ }
+ ],
+ "time": {"from": "now-30m", "to": "now"},
+ "refresh": "5s",
+ "schemaVersion": 16,
+ "version": 0
+ }
+}
+EOF
+
+ log_success "Kafka Load Test dashboard created"
+}
+
+# Create SeaweedFS dashboard
+create_seaweedfs_dashboard() {
+ log_info "Creating SeaweedFS Grafana dashboard..."
+
+ cat > "$MONITORING_DIR/grafana/dashboards/seaweedfs.json" << 'EOF'
+{
+ "dashboard": {
+ "id": null,
+ "title": "SeaweedFS Cluster Dashboard",
+ "tags": ["seaweedfs", "storage"],
+ "timezone": "browser",
+ "panels": [
+ {
+ "id": 1,
+ "title": "Master Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-master\"}",
+ "legendFormat": "Master Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 0, "y": 0}
+ },
+ {
+ "id": 2,
+ "title": "Volume Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-volume\"}",
+ "legendFormat": "Volume Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 6, "y": 0}
+ },
+ {
+ "id": 3,
+ "title": "Filer Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-filer\"}",
+ "legendFormat": "Filer Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 12, "y": 0}
+ },
+ {
+ "id": 4,
+ "title": "MQ Broker Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-mq-broker\"}",
+ "legendFormat": "MQ Broker Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 18, "y": 0}
+ }
+ ],
+ "time": {"from": "now-30m", "to": "now"},
+ "refresh": "10s",
+ "schemaVersion": 16,
+ "version": 0
+ }
+}
+EOF
+
+ log_success "SeaweedFS dashboard created"
+}
+
+# Main setup function
+main() {
+ log_info "Setting up monitoring for Kafka Client Load Test..."
+
+ setup_directories
+ create_prometheus_config
+ create_grafana_datasource
+ create_grafana_dashboard_provisioning
+ create_loadtest_dashboard
+ create_seaweedfs_dashboard
+
+ log_success "Monitoring setup completed!"
+ log_info "You can now start the monitoring stack with:"
+ log_info " ./scripts/run-loadtest.sh monitor"
+ log_info ""
+ log_info "After starting, access:"
+ log_info " Prometheus: http://localhost:9090"
+ log_info " Grafana: http://localhost:3000 (admin/admin)"
+}
+
+main "$@"
diff --git a/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh b/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh
new file mode 100755
index 000000000..e1a2f73e2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh
@@ -0,0 +1,151 @@
+#!/bin/bash
+
+# Test script to verify the retry logic works correctly
+# Simulates Schema Registry eventual consistency behavior
+
+set -euo pipefail
+
+# Colors
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[TEST]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[PASS]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[FAIL]${NC} $1"
+}
+
+# Mock function that simulates Schema Registry eventual consistency
+# First N attempts fail, then succeeds
+mock_schema_registry_query() {
+ local subject=$1
+ local min_attempts_to_succeed=$2
+ local current_attempt=$3
+
+ if [[ $current_attempt -ge $min_attempts_to_succeed ]]; then
+ # Simulate successful response
+ echo '{"id":1,"version":1,"schema":"test"}'
+ return 0
+ else
+ # Simulate 404 Not Found
+ echo '{"error_code":40401,"message":"Subject not found"}'
+ return 1
+ fi
+}
+
+# Simulate verify_schema_with_retry logic
+test_verify_with_retry() {
+ local subject=$1
+ local min_attempts_to_succeed=$2
+ local max_attempts=5
+ local attempt=1
+
+ log_info "Testing $subject (should succeed after $min_attempts_to_succeed attempts)"
+
+ while [[ $attempt -le $max_attempts ]]; do
+ local response
+ if response=$(mock_schema_registry_query "$subject" "$min_attempts_to_succeed" "$attempt"); then
+ if echo "$response" | grep -q '"id"'; then
+ if [[ $attempt -gt 1 ]]; then
+ log_success "$subject verified after $attempt attempts"
+ else
+ log_success "$subject verified on first attempt"
+ fi
+ return 0
+ fi
+ fi
+
+ # Schema not found, wait and retry
+ if [[ $attempt -lt $max_attempts ]]; then
+ # Exponential backoff: 0.1s, 0.2s, 0.4s, 0.8s
+ local wait_time=$(echo "scale=3; 0.1 * (2 ^ ($attempt - 1))" | bc)
+ log_info " Attempt $attempt failed, waiting ${wait_time}s before retry..."
+ sleep "$wait_time"
+ attempt=$((attempt + 1))
+ else
+ log_error "$subject verification failed after $max_attempts attempts"
+ return 1
+ fi
+ done
+
+ return 1
+}
+
+# Run tests
+log_info "=========================================="
+log_info "Testing Schema Registry Retry Logic"
+log_info "=========================================="
+echo ""
+
+# Test 1: Schema available immediately
+log_info "Test 1: Schema available immediately"
+if test_verify_with_retry "immediate-schema" 1; then
+ log_success "✓ Test 1 passed"
+else
+ log_error "✗ Test 1 failed"
+ exit 1
+fi
+echo ""
+
+# Test 2: Schema available after 2 attempts (200ms delay)
+log_info "Test 2: Schema available after 2 attempts"
+if test_verify_with_retry "delayed-schema-2" 2; then
+ log_success "✓ Test 2 passed"
+else
+ log_error "✗ Test 2 failed"
+ exit 1
+fi
+echo ""
+
+# Test 3: Schema available after 3 attempts (600ms delay)
+log_info "Test 3: Schema available after 3 attempts"
+if test_verify_with_retry "delayed-schema-3" 3; then
+ log_success "✓ Test 3 passed"
+else
+ log_error "✗ Test 3 failed"
+ exit 1
+fi
+echo ""
+
+# Test 4: Schema available after 4 attempts (1400ms delay)
+log_info "Test 4: Schema available after 4 attempts"
+if test_verify_with_retry "delayed-schema-4" 4; then
+ log_success "✓ Test 4 passed"
+else
+ log_error "✗ Test 4 failed"
+ exit 1
+fi
+echo ""
+
+# Test 5: Schema never available (should fail)
+log_info "Test 5: Schema never available (should fail gracefully)"
+if test_verify_with_retry "missing-schema" 10; then
+ log_error "✗ Test 5 failed (should have failed but passed)"
+ exit 1
+else
+ log_success "✓ Test 5 passed (correctly failed after max attempts)"
+fi
+echo ""
+
+log_success "=========================================="
+log_success "All tests passed! ✓"
+log_success "=========================================="
+log_info ""
+log_info "Summary:"
+log_info "- Immediate availability: works ✓"
+log_info "- 2-4 retry attempts: works ✓"
+log_info "- Max attempts handling: works ✓"
+log_info "- Exponential backoff: works ✓"
+log_info ""
+log_info "Total retry time budget: ~1.5 seconds (0.1+0.2+0.4+0.8)"
+log_info "This should handle Schema Registry consumer lag gracefully."
+
diff --git a/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh b/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh
new file mode 100755
index 000000000..d2560728b
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh
@@ -0,0 +1,291 @@
+#!/bin/bash
+
+# Wait for SeaweedFS and Kafka Gateway services to be ready
+# This script checks service health and waits until all services are operational
+
+set -euo pipefail
+
+# Colors
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+log_warning() {
+ echo -e "${YELLOW}[WARNING]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+# Configuration
+TIMEOUT=${TIMEOUT:-300} # 5 minutes default timeout
+CHECK_INTERVAL=${CHECK_INTERVAL:-5} # Check every 5 seconds
+SEAWEEDFS_MASTER_URL=${SEAWEEDFS_MASTER_URL:-"http://localhost:9333"}
+KAFKA_GATEWAY_URL=${KAFKA_GATEWAY_URL:-"localhost:9093"}
+SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"}
+SEAWEEDFS_FILER_URL=${SEAWEEDFS_FILER_URL:-"http://localhost:8888"}
+
+# Check if a service is reachable
+check_http_service() {
+ local url=$1
+ local name=$2
+
+ if curl -sf "$url" >/dev/null 2>&1; then
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Check TCP port
+check_tcp_service() {
+ local host=$1
+ local port=$2
+ local name=$3
+
+ if timeout 3 bash -c "</dev/tcp/$host/$port" 2>/dev/null; then
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Check SeaweedFS Master
+check_seaweedfs_master() {
+ if check_http_service "$SEAWEEDFS_MASTER_URL/cluster/status" "SeaweedFS Master"; then
+ # Additional check: ensure cluster has volumes
+ local status_json
+ status_json=$(curl -s "$SEAWEEDFS_MASTER_URL/cluster/status" 2>/dev/null || echo "{}")
+
+ # Check if we have at least one volume server
+ if echo "$status_json" | grep -q '"Max":0'; then
+ log_warning "SeaweedFS Master is running but no volumes are available"
+ return 1
+ fi
+
+ return 0
+ fi
+ return 1
+}
+
+# Check SeaweedFS Filer
+check_seaweedfs_filer() {
+ check_http_service "$SEAWEEDFS_FILER_URL/" "SeaweedFS Filer"
+}
+
+# Check Kafka Gateway
+check_kafka_gateway() {
+ local host="localhost"
+ local port="9093"
+ check_tcp_service "$host" "$port" "Kafka Gateway"
+}
+
+# Check Schema Registry
+check_schema_registry() {
+ # Check if Schema Registry container is running first
+ if ! docker compose ps schema-registry | grep -q "Up"; then
+ # Schema Registry is not running, which is okay for basic tests
+ return 0
+ fi
+
+ # FIXED: Wait for Docker healthcheck to report "healthy", not just "Up"
+ # Schema Registry has a 30s start_period, so we need to wait for the actual healthcheck
+ local health_status
+ health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "none")
+
+ # If container has no healthcheck or healthcheck is not yet healthy, check HTTP directly
+ if [[ "$health_status" == "healthy" ]]; then
+ # Container reports healthy, do a final verification
+ if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then
+ return 0
+ fi
+ elif [[ "$health_status" == "starting" ]]; then
+ # Still in startup period, wait longer
+ return 1
+ elif [[ "$health_status" == "none" ]]; then
+ # No healthcheck defined (shouldn't happen), fall back to HTTP check
+ if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then
+ local subjects
+ subjects=$(curl -s "$SCHEMA_REGISTRY_URL/subjects" 2>/dev/null || echo "[]")
+
+ # Schema registry should at least return an empty array
+ if [[ "$subjects" == "[]" ]]; then
+ return 0
+ elif echo "$subjects" | grep -q '\['; then
+ return 0
+ else
+ log_warning "Schema Registry is not properly connected"
+ return 1
+ fi
+ fi
+ fi
+ return 1
+}
+
+# Check MQ Broker
+check_mq_broker() {
+ check_tcp_service "localhost" "17777" "SeaweedFS MQ Broker"
+}
+
+# Main health check function
+check_all_services() {
+ local all_healthy=true
+
+ log_info "Checking service health..."
+
+ # Check SeaweedFS Master
+ if check_seaweedfs_master; then
+ log_success "✓ SeaweedFS Master is healthy"
+ else
+ log_error "✗ SeaweedFS Master is not ready"
+ all_healthy=false
+ fi
+
+ # Check SeaweedFS Filer
+ if check_seaweedfs_filer; then
+ log_success "✓ SeaweedFS Filer is healthy"
+ else
+ log_error "✗ SeaweedFS Filer is not ready"
+ all_healthy=false
+ fi
+
+ # Check MQ Broker
+ if check_mq_broker; then
+ log_success "✓ SeaweedFS MQ Broker is healthy"
+ else
+ log_error "✗ SeaweedFS MQ Broker is not ready"
+ all_healthy=false
+ fi
+
+ # Check Kafka Gateway
+ if check_kafka_gateway; then
+ log_success "✓ Kafka Gateway is healthy"
+ else
+ log_error "✗ Kafka Gateway is not ready"
+ all_healthy=false
+ fi
+
+ # Check Schema Registry
+ if ! docker compose ps schema-registry | grep -q "Up"; then
+ log_warning "⚠ Schema Registry is stopped (skipping)"
+ elif check_schema_registry; then
+ log_success "✓ Schema Registry is healthy"
+ else
+ # Check if it's still starting up (healthcheck start_period)
+ local health_status
+ health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "unknown")
+ if [[ "$health_status" == "starting" ]]; then
+ log_warning "⏳ Schema Registry is starting (waiting for healthcheck...)"
+ else
+ log_error "✗ Schema Registry is not ready (status: $health_status)"
+ fi
+ all_healthy=false
+ fi
+
+ $all_healthy
+}
+
+# Wait for all services to be ready
+wait_for_services() {
+ log_info "Waiting for all services to be ready (timeout: ${TIMEOUT}s)..."
+
+ local elapsed=0
+
+ while [[ $elapsed -lt $TIMEOUT ]]; do
+ if check_all_services; then
+ log_success "All services are ready! (took ${elapsed}s)"
+ return 0
+ fi
+
+ log_info "Some services are not ready yet. Waiting ${CHECK_INTERVAL}s... (${elapsed}/${TIMEOUT}s)"
+ sleep $CHECK_INTERVAL
+ elapsed=$((elapsed + CHECK_INTERVAL))
+ done
+
+ log_error "Services did not become ready within ${TIMEOUT} seconds"
+ log_error "Final service status:"
+ check_all_services
+
+ # Always dump Schema Registry diagnostics on timeout since it's the problematic service
+ log_error "==========================================="
+ log_error "Schema Registry Container Status:"
+ log_error "==========================================="
+ docker compose ps schema-registry 2>&1 || echo "Failed to get container status"
+ docker inspect loadtest-schema-registry --format='Health: {{.State.Health.Status}} ({{len .State.Health.Log}} checks)' 2>&1 || echo "Failed to inspect container"
+ log_error "==========================================="
+
+ log_error "Network Connectivity Check:"
+ log_error "==========================================="
+ log_error "Can Schema Registry reach Kafka Gateway?"
+ docker compose exec -T schema-registry ping -c 3 kafka-gateway 2>&1 || echo "Ping failed"
+ docker compose exec -T schema-registry nc -zv kafka-gateway 9093 2>&1 || echo "Port 9093 unreachable"
+ log_error "==========================================="
+
+ log_error "Schema Registry Logs (last 100 lines):"
+ log_error "==========================================="
+ docker compose logs --tail=100 schema-registry 2>&1 || echo "Failed to get Schema Registry logs"
+ log_error "==========================================="
+
+ log_error "Kafka Gateway Logs (last 50 lines with 'SR' prefix):"
+ log_error "==========================================="
+ docker compose logs --tail=200 kafka-gateway 2>&1 | grep -i "SR" | tail -50 || echo "No SR-related logs found in Kafka Gateway"
+ log_error "==========================================="
+
+ log_error "MQ Broker Logs (last 30 lines):"
+ log_error "==========================================="
+ docker compose logs --tail=30 seaweedfs-mq-broker 2>&1 || echo "Failed to get MQ Broker logs"
+ log_error "==========================================="
+
+ return 1
+}
+
+# Show current service status
+show_status() {
+ log_info "Current service status:"
+ check_all_services
+}
+
+# Main function
+main() {
+ case "${1:-wait}" in
+ "wait")
+ wait_for_services
+ ;;
+ "check")
+ show_status
+ ;;
+ "status")
+ show_status
+ ;;
+ *)
+ echo "Usage: $0 [wait|check|status]"
+ echo ""
+ echo "Commands:"
+ echo " wait - Wait for all services to be ready (default)"
+ echo " check - Check current service status"
+ echo " status - Same as check"
+ echo ""
+ echo "Environment variables:"
+ echo " TIMEOUT - Maximum time to wait in seconds (default: 300)"
+ echo " CHECK_INTERVAL - Check interval in seconds (default: 5)"
+ echo " SEAWEEDFS_MASTER_URL - Master URL (default: http://localhost:9333)"
+ echo " KAFKA_GATEWAY_URL - Gateway URL (default: localhost:9093)"
+ echo " SCHEMA_REGISTRY_URL - Schema Registry URL (default: http://localhost:8081)"
+ echo " SEAWEEDFS_FILER_URL - Filer URL (default: http://localhost:8888)"
+ exit 1
+ ;;
+ esac
+}
+
+main "$@"
diff --git a/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java
new file mode 100644
index 000000000..f511b4cf6
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java
@@ -0,0 +1,290 @@
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.common.Node;
+
+import java.io.*;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+public class AdminClientDebugger {
+
+ public static void main(String[] args) throws Exception {
+ String broker = args.length > 0 ? args[0] : "localhost:9093";
+
+ System.out.println("=".repeat(80));
+ System.out.println("KAFKA ADMINCLIENT DEBUGGER");
+ System.out.println("=".repeat(80));
+ System.out.println("Target broker: " + broker);
+
+ // Test 1: Raw socket - capture exact bytes
+ System.out.println("\n" + "=".repeat(80));
+ System.out.println("TEST 1: Raw Socket - Capture ApiVersions Exchange");
+ System.out.println("=".repeat(80));
+ testRawSocket(broker);
+
+ // Test 2: AdminClient with detailed logging
+ System.out.println("\n" + "=".repeat(80));
+ System.out.println("TEST 2: AdminClient with Logging");
+ System.out.println("=".repeat(80));
+ testAdminClient(broker);
+ }
+
+ private static void testRawSocket(String broker) {
+ String[] parts = broker.split(":");
+ String host = parts[0];
+ int port = Integer.parseInt(parts[1]);
+
+ try (Socket socket = new Socket(host, port)) {
+ socket.setSoTimeout(10000);
+
+ InputStream in = socket.getInputStream();
+ OutputStream out = socket.getOutputStream();
+
+ System.out.println("Connected to " + broker);
+
+ // Build ApiVersions request (v4)
+ // Format:
+ // [Size][ApiKey=18][ApiVersion=4][CorrelationId=0][ClientId][TaggedFields]
+ ByteArrayOutputStream requestBody = new ByteArrayOutputStream();
+
+ // ApiKey (2 bytes) = 18
+ requestBody.write(0);
+ requestBody.write(18);
+
+ // ApiVersion (2 bytes) = 4
+ requestBody.write(0);
+ requestBody.write(4);
+
+ // CorrelationId (4 bytes) = 0
+ requestBody.write(new byte[] { 0, 0, 0, 0 });
+
+ // ClientId (compact string) = "debug-client"
+ String clientId = "debug-client";
+ writeCompactString(requestBody, clientId);
+
+ // Tagged fields (empty)
+ requestBody.write(0x00);
+
+ byte[] request = requestBody.toByteArray();
+
+ // Write size
+ ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+ sizeBuffer.putInt(request.length);
+ out.write(sizeBuffer.array());
+
+ // Write request
+ out.write(request);
+ out.flush();
+
+ System.out.println("\nSENT ApiVersions v4 Request:");
+ System.out.println(" Size: " + request.length + " bytes");
+ hexDump(" Request", request, Math.min(64, request.length));
+
+ // Read response size
+ byte[] sizeBytes = new byte[4];
+ int read = in.read(sizeBytes);
+ if (read != 4) {
+ System.out.println("Failed to read response size (got " + read + " bytes)");
+ return;
+ }
+
+ int responseSize = ByteBuffer.wrap(sizeBytes).getInt();
+ System.out.println("\nRECEIVED Response:");
+ System.out.println(" Size: " + responseSize + " bytes");
+
+ // Read response body
+ byte[] responseBytes = new byte[responseSize];
+ int totalRead = 0;
+ while (totalRead < responseSize) {
+ int n = in.read(responseBytes, totalRead, responseSize - totalRead);
+ if (n == -1) {
+ System.out.println("Unexpected EOF after " + totalRead + " bytes");
+ return;
+ }
+ totalRead += n;
+ }
+
+ System.out.println(" Read complete response: " + totalRead + " bytes");
+
+ // Decode response
+ System.out.println("\nRESPONSE STRUCTURE:");
+ decodeApiVersionsResponse(responseBytes);
+
+ // Try to read more (should timeout or get EOF)
+ System.out.println("\n⏱️ Waiting for any additional data (10s timeout)...");
+ socket.setSoTimeout(10000);
+ try {
+ int nextByte = in.read();
+ if (nextByte == -1) {
+ System.out.println(" Server closed connection (EOF)");
+ } else {
+ System.out.println(" Unexpected data: " + nextByte);
+ }
+ } catch (SocketTimeoutException e) {
+ System.out.println(" Timeout - no additional data");
+ }
+
+ } catch (Exception e) {
+ System.out.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private static void testAdminClient(String broker) {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin-client-debugger");
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
+
+ System.out.println("Creating AdminClient with config:");
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+
+ try (AdminClient adminClient = AdminClient.create(props)) {
+ System.out.println("AdminClient created");
+
+ // Give the thread time to start
+ Thread.sleep(1000);
+
+ System.out.println("\nCalling describeCluster()...");
+ DescribeClusterResult result = adminClient.describeCluster();
+
+ System.out.println(" Waiting for nodes...");
+ Collection<Node> nodes = result.nodes().get();
+
+ System.out.println("Cluster description retrieved:");
+ System.out.println(" Nodes: " + nodes.size());
+ for (Node node : nodes) {
+ System.out.println(" - Node " + node.id() + ": " + node.host() + ":" + node.port());
+ }
+
+ System.out.println("\n Cluster ID: " + result.clusterId().get());
+
+ Node controller = result.controller().get();
+ if (controller != null) {
+ System.out.println(" Controller: Node " + controller.id());
+ }
+
+ } catch (ExecutionException e) {
+ System.out.println("Execution error: " + e.getCause().getMessage());
+ e.getCause().printStackTrace();
+ } catch (Exception e) {
+ System.out.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private static void decodeApiVersionsResponse(byte[] data) {
+ int offset = 0;
+
+ try {
+ // Correlation ID (4 bytes)
+ int correlationId = ByteBuffer.wrap(data, offset, 4).getInt();
+ System.out.println(" [Offset " + offset + "] Correlation ID: " + correlationId);
+ offset += 4;
+
+ // Header tagged fields (varint - should be 0x00 for flexible v3+)
+ int taggedFieldsLength = readUnsignedVarint(data, offset);
+ System.out.println(" [Offset " + offset + "] Header Tagged Fields Length: " + taggedFieldsLength);
+ offset += varintSize(data[offset]);
+
+ // Error code (2 bytes)
+ short errorCode = ByteBuffer.wrap(data, offset, 2).getShort();
+ System.out.println(" [Offset " + offset + "] Error Code: " + errorCode);
+ offset += 2;
+
+ // API Keys array (compact array - varint length)
+ int apiKeysLength = readUnsignedVarint(data, offset) - 1; // Compact array: length+1
+ System.out.println(" [Offset " + offset + "] API Keys Count: " + apiKeysLength);
+ offset += varintSize(data[offset]);
+
+ // Show first few API keys
+ System.out.println(" First 5 API Keys:");
+ for (int i = 0; i < Math.min(5, apiKeysLength); i++) {
+ short apiKey = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ short minVersion = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ short maxVersion = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ // Per-element tagged fields
+ int perElementTagged = readUnsignedVarint(data, offset);
+ offset += varintSize(data[offset]);
+
+ System.out.println(" " + (i + 1) + ". API " + apiKey + ": v" + minVersion + "-v" + maxVersion);
+ }
+
+ System.out.println(" ... (showing first 5 of " + apiKeysLength + " APIs)");
+ System.out.println(" Response structure is valid!");
+
+ // Hex dump of first 64 bytes
+ hexDump("\n First 64 bytes", data, Math.min(64, data.length));
+
+ } catch (Exception e) {
+ System.out.println(" Failed to decode at offset " + offset + ": " + e.getMessage());
+ hexDump(" Raw bytes", data, Math.min(128, data.length));
+ }
+ }
+
+ private static int readUnsignedVarint(byte[] data, int offset) {
+ int value = 0;
+ int shift = 0;
+ while (true) {
+ byte b = data[offset++];
+ value |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0)
+ break;
+ shift += 7;
+ }
+ return value;
+ }
+
+ private static int varintSize(byte firstByte) {
+ int size = 1;
+ byte b = firstByte;
+ while ((b & 0x80) != 0) {
+ size++;
+ b = (byte) (b << 1);
+ }
+ return size;
+ }
+
+ private static void writeCompactString(ByteArrayOutputStream out, String str) {
+ byte[] bytes = str.getBytes();
+ writeUnsignedVarint(out, bytes.length + 1); // Compact string: length+1
+ out.write(bytes, 0, bytes.length);
+ }
+
+ private static void writeUnsignedVarint(ByteArrayOutputStream out, int value) {
+ while ((value & ~0x7F) != 0) {
+ out.write((byte) ((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ out.write((byte) value);
+ }
+
+ private static void hexDump(String label, byte[] data, int length) {
+ System.out.println(label + " (hex dump):");
+ for (int i = 0; i < length; i += 16) {
+ System.out.printf(" %04x ", i);
+ for (int j = 0; j < 16; j++) {
+ if (i + j < length) {
+ System.out.printf("%02x ", data[i + j] & 0xFF);
+ } else {
+ System.out.print(" ");
+ }
+ if (j == 7)
+ System.out.print(" ");
+ }
+ System.out.print(" |");
+ for (int j = 0; j < 16 && i + j < length; j++) {
+ byte b = data[i + j];
+ System.out.print((b >= 32 && b < 127) ? (char) b : '.');
+ }
+ System.out.println("|");
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java b/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java
new file mode 100644
index 000000000..177a86233
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java
@@ -0,0 +1,72 @@
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class JavaAdminClientTest {
+ public static void main(String[] args) {
+ // Set uncaught exception handler to catch AdminClient thread errors
+ Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+ System.err.println("UNCAUGHT EXCEPTION in thread " + t.getName() + ":");
+ e.printStackTrace();
+ });
+
+ String bootstrapServers = args.length > 0 ? args[0] : "localhost:9093";
+
+ System.out.println("Testing Kafka wire protocol with broker: " + bootstrapServers);
+
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG, "java-admin-test");
+ props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000);
+ props.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 30000);
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
+ props.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
+ props.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
+
+ System.out.println("Creating AdminClient with config:");
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+
+ try (AdminClient adminClient = AdminClient.create(props)) {
+ System.out.println("AdminClient created successfully");
+ Thread.sleep(2000); // Give it time to initialize
+
+ // Test 1: Describe Cluster (uses Metadata API internally)
+ System.out.println("\n=== Test 1: Describe Cluster ===");
+ try {
+ DescribeClusterResult clusterResult = adminClient.describeCluster();
+ String clusterId = clusterResult.clusterId().get(10, TimeUnit.SECONDS);
+ int nodeCount = clusterResult.nodes().get(10, TimeUnit.SECONDS).size();
+ System.out.println("Cluster ID: " + clusterId);
+ System.out.println("Nodes: " + nodeCount);
+ } catch (Exception e) {
+ System.err.println("Describe Cluster failed: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ // Test 2: List Topics
+ System.out.println("\n=== Test 2: List Topics ===");
+ try {
+ ListTopicsResult topicsResult = adminClient.listTopics();
+ int topicCount = topicsResult.names().get(10, TimeUnit.SECONDS).size();
+ System.out.println("Topics: " + topicCount);
+ } catch (Exception e) {
+ System.err.println("List Topics failed: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ System.out.println("\nAll tests completed!");
+
+ } catch (Exception e) {
+ System.err.println("AdminClient creation failed: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java b/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java
new file mode 100644
index 000000000..41c884544
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java
@@ -0,0 +1,82 @@
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+public class JavaKafkaConsumer {
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println("Usage: java JavaKafkaConsumer <broker> <topic>");
+ System.exit(1);
+ }
+
+ String broker = args[0];
+ String topic = args[1];
+
+ System.out.println("Connecting to Kafka broker: " + broker);
+ System.out.println("Topic: " + topic);
+
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "java-test-group");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
+ props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
+ props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList(topic));
+
+ System.out.println("Starting to consume messages...");
+
+ int messageCount = 0;
+ int errorCount = 0;
+ long startTime = System.currentTimeMillis();
+
+ try {
+ while (true) {
+ try {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+ for (ConsumerRecord<String, String> record : records) {
+ messageCount++;
+ System.out.printf("Message #%d: topic=%s partition=%d offset=%d key=%s value=%s%n",
+ messageCount, record.topic(), record.partition(), record.offset(),
+ record.key(), record.value());
+ }
+
+ // Stop after 100 messages or 60 seconds
+ if (messageCount >= 100 || (System.currentTimeMillis() - startTime) > 60000) {
+ long duration = System.currentTimeMillis() - startTime;
+ System.out.printf("%nSuccessfully consumed %d messages in %dms%n", messageCount, duration);
+ System.out.printf("Success rate: %.1f%% (%d/%d including errors)%n",
+ (double) messageCount / (messageCount + errorCount) * 100, messageCount,
+ messageCount + errorCount);
+ break;
+ }
+ } catch (Exception e) {
+ errorCount++;
+ System.err.printf("Error during poll #%d: %s%n", errorCount, e.getMessage());
+ e.printStackTrace();
+
+ // Stop after 10 consecutive errors or 60 seconds
+ if (errorCount > 10 || (System.currentTimeMillis() - startTime) > 60000) {
+ long duration = System.currentTimeMillis() - startTime;
+ System.err.printf("%nStopping after %d errors in %dms%n", errorCount, duration);
+ break;
+ }
+ }
+ }
+ } finally {
+ consumer.close();
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java b/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java
new file mode 100644
index 000000000..e9898d5f0
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java
@@ -0,0 +1,68 @@
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+public class JavaProducerTest {
+ public static void main(String[] args) {
+ String bootstrapServers = args.length > 0 ? args[0] : "localhost:9093";
+ String topicName = args.length > 1 ? args[1] : "test-topic";
+
+ System.out.println("Testing Kafka Producer with broker: " + bootstrapServers);
+ System.out.println(" Topic: " + topicName);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "java-producer-test");
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+ props.put(ProducerConfig.RETRIES_CONFIG, 0);
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+
+ System.out.println("Creating Producer with config:");
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+ System.out.println("Producer created successfully");
+
+ // Try to send a test message
+ System.out.println("\n=== Test: Send Message ===");
+ try {
+ ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "key1", "value1");
+ System.out.println("Sending record to topic: " + topicName);
+ Future<RecordMetadata> future = producer.send(record);
+
+ RecordMetadata metadata = future.get(); // This will block and wait for response
+ System.out.println("Message sent successfully!");
+ System.out.println(" Topic: " + metadata.topic());
+ System.out.println(" Partition: " + metadata.partition());
+ System.out.println(" Offset: " + metadata.offset());
+ } catch (Exception e) {
+ System.err.println("Send failed: " + e.getMessage());
+ e.printStackTrace();
+
+ // Print cause chain
+ Throwable cause = e.getCause();
+ int depth = 1;
+ while (cause != null && depth < 5) {
+ System.err.println(
+ " Cause " + depth + ": " + cause.getClass().getName() + ": " + cause.getMessage());
+ cause = cause.getCause();
+ depth++;
+ }
+ }
+
+ System.out.println("\nTest completed!");
+
+ } catch (Exception e) {
+ System.err.println("Producer creation or operation failed: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java b/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java
new file mode 100644
index 000000000..3c33ae0ea
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java
@@ -0,0 +1,124 @@
+package tools;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+
+public class SchemaRegistryTest {
+ private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
+
+ public static void main(String[] args) {
+ System.out.println("================================================================================");
+ System.out.println("Schema Registry Test - Verifying In-Memory Read Optimization");
+ System.out.println("================================================================================\n");
+
+ SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 100);
+ boolean allTestsPassed = true;
+
+ try {
+ // Test 1: Register first schema
+ System.out.println("Test 1: Registering first schema (user-value)...");
+ Schema userValueSchema = SchemaBuilder
+ .record("User").fields()
+ .requiredString("name")
+ .requiredInt("age")
+ .endRecord();
+
+ long startTime = System.currentTimeMillis();
+ int schema1Id = schemaRegistry.register("user-value", userValueSchema);
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println("✓ SUCCESS: Schema registered with ID: " + schema1Id + " (took " + elapsedTime + "ms)");
+
+ // Test 2: Register second schema immediately (tests read-after-write)
+ System.out.println("\nTest 2: Registering second schema immediately (user-key)...");
+ Schema userKeySchema = SchemaBuilder
+ .record("UserKey").fields()
+ .requiredString("userId")
+ .endRecord();
+
+ startTime = System.currentTimeMillis();
+ int schema2Id = schemaRegistry.register("user-key", userKeySchema);
+ elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println("✓ SUCCESS: Schema registered with ID: " + schema2Id + " (took " + elapsedTime + "ms)");
+
+ // Test 3: Rapid fire registrations (tests concurrent writes)
+ System.out.println("\nTest 3: Rapid fire registrations (10 schemas in parallel)...");
+ startTime = System.currentTimeMillis();
+ Thread[] threads = new Thread[10];
+ final boolean[] results = new boolean[10];
+
+ for (int i = 0; i < 10; i++) {
+ final int index = i;
+ threads[i] = new Thread(() -> {
+ try {
+ Schema schema = SchemaBuilder
+ .record("Test" + index).fields()
+ .requiredString("field" + index)
+ .endRecord();
+ schemaRegistry.register("test-" + index + "-value", schema);
+ results[index] = true;
+ } catch (Exception e) {
+ System.err.println("✗ ERROR in thread " + index + ": " + e.getMessage());
+ results[index] = false;
+ }
+ });
+ threads[i].start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ elapsedTime = System.currentTimeMillis() - startTime;
+ int successCount = 0;
+ for (boolean result : results) {
+ if (result) successCount++;
+ }
+
+ if (successCount == 10) {
+ System.out.println("✓ SUCCESS: All 10 schemas registered (took " + elapsedTime + "ms total, ~" + (elapsedTime / 10) + "ms per schema)");
+ } else {
+ System.out.println("✗ PARTIAL FAILURE: Only " + successCount + "/10 schemas registered");
+ allTestsPassed = false;
+ }
+
+ // Test 4: Verify we can retrieve all schemas
+ System.out.println("\nTest 4: Verifying all schemas are retrievable...");
+ startTime = System.currentTimeMillis();
+ Schema retrieved1 = schemaRegistry.getById(schema1Id);
+ Schema retrieved2 = schemaRegistry.getById(schema2Id);
+ elapsedTime = System.currentTimeMillis() - startTime;
+
+ if (retrieved1.equals(userValueSchema) && retrieved2.equals(userKeySchema)) {
+ System.out.println("✓ SUCCESS: All schemas retrieved correctly (took " + elapsedTime + "ms)");
+ } else {
+ System.out.println("✗ FAILURE: Schema mismatch");
+ allTestsPassed = false;
+ }
+
+ // Summary
+ System.out.println("\n===============================================================================");
+ if (allTestsPassed) {
+ System.out.println("✓ ALL TESTS PASSED!");
+ System.out.println("===============================================================================");
+ System.out.println("\nOptimization verified:");
+ System.out.println("- ForceFlush is NO LONGER NEEDED");
+ System.out.println("- Subscribers read from in-memory buffer using IsOffsetInMemory()");
+ System.out.println("- Per-subscriber notification channels provide instant wake-up");
+ System.out.println("- True concurrent writes without serialization");
+ System.exit(0);
+ } else {
+ System.out.println("✗ SOME TESTS FAILED");
+ System.out.println("===============================================================================");
+ System.exit(1);
+ }
+
+ } catch (Exception e) {
+ System.err.println("\n✗ FATAL ERROR: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
+
diff --git a/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java b/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java
new file mode 100644
index 000000000..f334c045a
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java
@@ -0,0 +1,78 @@
+import java.net.*;
+import java.nio.*;
+import java.nio.channels.*;
+
+public class TestSocketReadiness {
+ public static void main(String[] args) throws Exception {
+ String host = args.length > 0 ? args[0] : "localhost";
+ int port = args.length > 1 ? Integer.parseInt(args[1]) : 9093;
+
+ System.out.println("Testing socket readiness with " + host + ":" + port);
+
+ // Test 1: Simple blocking connect
+ System.out.println("\n=== Test 1: Blocking Socket ===");
+ try (Socket socket = new Socket()) {
+ socket.connect(new InetSocketAddress(host, port), 5000);
+ System.out.println("Blocking socket connected");
+ System.out.println(" Available bytes: " + socket.getInputStream().available());
+ Thread.sleep(100);
+ System.out.println(" Available bytes after 100ms: " + socket.getInputStream().available());
+ } catch (Exception e) {
+ System.err.println("Blocking socket failed: " + e.getMessage());
+ }
+
+ // Test 2: Non-blocking NIO socket (like Kafka client uses)
+ System.out.println("\n=== Test 2: Non-blocking NIO Socket ===");
+ Selector selector = Selector.open();
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+
+ try {
+ boolean connected = channel.connect(new InetSocketAddress(host, port));
+ System.out.println(" connect() returned: " + connected);
+
+ SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
+
+ int ready = selector.select(5000);
+ System.out.println(" selector.select() returned: " + ready);
+
+ if (ready > 0) {
+ for (SelectionKey k : selector.selectedKeys()) {
+ if (k.isConnectable()) {
+ System.out.println(" isConnectable: true");
+ boolean finished = channel.finishConnect();
+ System.out.println(" finishConnect() returned: " + finished);
+
+ if (finished) {
+ k.interestOps(SelectionKey.OP_READ);
+
+ // Now check if immediately readable (THIS is what might be wrong)
+ selector.selectedKeys().clear();
+ int readReady = selector.selectNow();
+ System.out.println(" Immediately after connect, selectNow() = " + readReady);
+
+ if (readReady > 0) {
+ System.out.println(" Socket is IMMEDIATELY readable (unexpected!)");
+ ByteBuffer buf = ByteBuffer.allocate(1);
+ int bytesRead = channel.read(buf);
+ System.out.println(" read() returned: " + bytesRead);
+ } else {
+ System.out.println(" Socket is NOT immediately readable (correct)");
+ }
+ }
+ }
+ }
+ }
+
+ System.out.println("NIO socket test completed");
+ } catch (Exception e) {
+ System.err.println("NIO socket failed: " + e.getMessage());
+ e.printStackTrace();
+ } finally {
+ channel.close();
+ selector.close();
+ }
+
+ System.out.println("\nAll tests completed");
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/go.mod b/test/kafka/kafka-client-loadtest/tools/go.mod
new file mode 100644
index 000000000..c63d94230
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/go.mod
@@ -0,0 +1,10 @@
+module simple-test
+
+go 1.24.7
+
+require github.com/segmentio/kafka-go v0.4.49
+
+require (
+ github.com/klauspost/compress v1.15.9 // indirect
+ github.com/pierrec/lz4/v4 v4.1.15 // indirect
+)
diff --git a/test/kafka/kafka-client-loadtest/tools/go.sum b/test/kafka/kafka-client-loadtest/tools/go.sum
new file mode 100644
index 000000000..74b476c2d
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/go.sum
@@ -0,0 +1,24 @@
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk=
+github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
+github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
+golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
+golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
+golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go
new file mode 100644
index 000000000..1da40c89f
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go
@@ -0,0 +1,69 @@
+package main
+
+import (
+ "context"
+ "log"
+ "os"
+ "time"
+
+ "github.com/segmentio/kafka-go"
+)
+
+func main() {
+ if len(os.Args) < 3 {
+ log.Fatal("Usage: kafka-go-consumer <broker> <topic>")
+ }
+ broker := os.Args[1]
+ topic := os.Args[2]
+
+ log.Printf("Connecting to Kafka broker: %s", broker)
+ log.Printf("Topic: %s", topic)
+
+ // Create a new reader
+ r := kafka.NewReader(kafka.ReaderConfig{
+ Brokers: []string{broker},
+ Topic: topic,
+ GroupID: "kafka-go-test-group",
+ MinBytes: 1,
+ MaxBytes: 10e6, // 10MB
+ MaxWait: 1 * time.Second,
+ })
+ defer r.Close()
+
+ log.Printf("Starting to consume messages...")
+
+ ctx := context.Background()
+ messageCount := 0
+ errorCount := 0
+ startTime := time.Now()
+
+ for {
+ m, err := r.ReadMessage(ctx)
+ if err != nil {
+ errorCount++
+ log.Printf("Error reading message #%d: %v", messageCount+1, err)
+
+ // Stop after 10 consecutive errors or 60 seconds
+ if errorCount > 10 || time.Since(startTime) > 60*time.Second {
+ log.Printf("\nStopping after %d errors in %v", errorCount, time.Since(startTime))
+ break
+ }
+ continue
+ }
+
+ // Reset error count on successful read
+ errorCount = 0
+ messageCount++
+
+ log.Printf("Message #%d: topic=%s partition=%d offset=%d key=%s value=%s",
+ messageCount, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
+
+ // Stop after 100 messages or 60 seconds
+ if messageCount >= 100 || time.Since(startTime) > 60*time.Second {
+ log.Printf("\nSuccessfully consumed %d messages in %v", messageCount, time.Since(startTime))
+ log.Printf("Success rate: %.1f%% (%d/%d including errors)",
+ float64(messageCount)/float64(messageCount+errorCount)*100, messageCount, messageCount+errorCount)
+ break
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/log4j.properties b/test/kafka/kafka-client-loadtest/tools/log4j.properties
new file mode 100644
index 000000000..ed0cd0fe5
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/log4j.properties
@@ -0,0 +1,12 @@
+log4j.rootLogger=DEBUG, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c: %m%n
+
+# More verbose for Kafka client
+log4j.logger.org.apache.kafka=DEBUG
+log4j.logger.org.apache.kafka.clients=TRACE
+log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE
+
+
diff --git a/test/kafka/kafka-client-loadtest/tools/pom.xml b/test/kafka/kafka-client-loadtest/tools/pom.xml
new file mode 100644
index 000000000..58a858e95
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.seaweedfs.test</groupId>
+ <artifactId>kafka-consumer-test</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <kafka.version>3.9.1</kafka.version>
+ <confluent.version>7.6.0</confluent.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-schema-registry-client</artifactId>
+ <version>${confluent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-avro-serializer</artifactId>
+ <version>${confluent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>2.0.9</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.11.0</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <mainClass>tools.SchemaRegistryTest</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
+
+
diff --git a/test/kafka/kafka-client-loadtest/tools/simple-test b/test/kafka/kafka-client-loadtest/tools/simple-test
new file mode 100755
index 000000000..47eef7386
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/simple-test
Binary files differ
diff --git a/test/kafka/kafka-client-loadtest/verify_schema_formats.sh b/test/kafka/kafka-client-loadtest/verify_schema_formats.sh
new file mode 100755
index 000000000..6ded75b33
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/verify_schema_formats.sh
@@ -0,0 +1,63 @@
+#!/bin/bash
+# Verify schema format distribution across topics
+
+set -e
+
+SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_URL:-http://localhost:8081}"
+TOPIC_PREFIX="${TOPIC_PREFIX:-loadtest-topic}"
+TOPIC_COUNT="${TOPIC_COUNT:-5}"
+
+echo "================================"
+echo "Schema Format Verification"
+echo "================================"
+echo ""
+echo "Schema Registry: $SCHEMA_REGISTRY_URL"
+echo "Topic Prefix: $TOPIC_PREFIX"
+echo "Topic Count: $TOPIC_COUNT"
+echo ""
+
+echo "Registered Schemas:"
+echo "-------------------"
+
+for i in $(seq 0 $((TOPIC_COUNT-1))); do
+ topic="${TOPIC_PREFIX}-${i}"
+ subject="${topic}-value"
+
+ echo -n "Topic $i ($topic): "
+
+ # Try to get schema
+ response=$(curl -s "${SCHEMA_REGISTRY_URL}/subjects/${subject}/versions/latest" 2>/dev/null || echo '{"error":"not found"}')
+
+ if echo "$response" | grep -q "error"; then
+ echo "❌ NOT REGISTERED"
+ else
+ schema_type=$(echo "$response" | grep -o '"schemaType":"[^"]*"' | cut -d'"' -f4)
+ schema_id=$(echo "$response" | grep -o '"id":[0-9]*' | cut -d':' -f2)
+
+ if [ -z "$schema_type" ]; then
+ schema_type="AVRO" # Default if not specified
+ fi
+
+ # Expected format based on index
+ if [ $((i % 2)) -eq 0 ]; then
+ expected="AVRO"
+ else
+ expected="JSON"
+ fi
+
+ if [ "$schema_type" = "$expected" ]; then
+ echo "✅ $schema_type (ID: $schema_id) - matches expected"
+ else
+ echo "⚠️ $schema_type (ID: $schema_id) - expected $expected"
+ fi
+ fi
+done
+
+echo ""
+echo "Expected Distribution:"
+echo "----------------------"
+echo "Even indices (0, 2, 4, ...): AVRO"
+echo "Odd indices (1, 3, 5, ...): JSON"
+echo ""
+
+