aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/scripts')
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/register-schemas.sh423
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/run-loadtest.sh480
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh352
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh151
-rwxr-xr-xtest/kafka/kafka-client-loadtest/scripts/wait-for-services.sh291
5 files changed, 1697 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh b/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh
new file mode 100755
index 000000000..58cb0f114
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/register-schemas.sh
@@ -0,0 +1,423 @@
+#!/bin/bash
+
+# Register schemas with Schema Registry for load testing
+# This script registers the necessary schemas before running load tests
+
+set -euo pipefail
+
+# Colors
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+log_warning() {
+ echo -e "${YELLOW}[WARN]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+# Configuration
+SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"}
+TIMEOUT=${TIMEOUT:-60}
+CHECK_INTERVAL=${CHECK_INTERVAL:-2}
+
+# Wait for Schema Registry to be ready
+wait_for_schema_registry() {
+ log_info "Waiting for Schema Registry to be ready..."
+
+ local elapsed=0
+ while [[ $elapsed -lt $TIMEOUT ]]; do
+ if curl -sf --max-time 5 "$SCHEMA_REGISTRY_URL/subjects" >/dev/null 2>&1; then
+ log_success "Schema Registry is ready!"
+ return 0
+ fi
+
+ log_info "Schema Registry not ready yet. Waiting ${CHECK_INTERVAL}s... (${elapsed}/${TIMEOUT}s)"
+ sleep $CHECK_INTERVAL
+ elapsed=$((elapsed + CHECK_INTERVAL))
+ done
+
+ log_error "Schema Registry did not become ready within ${TIMEOUT} seconds"
+ return 1
+}
+
+# Register a schema for a subject
+register_schema() {
+ local subject=$1
+ local schema=$2
+ local schema_type=${3:-"AVRO"}
+ local max_attempts=5
+ local attempt=1
+
+ log_info "Registering schema for subject: $subject"
+
+ # Create the schema registration payload
+ local escaped_schema=$(echo "$schema" | jq -Rs .)
+ local payload=$(cat <<EOF
+{
+ "schema": $escaped_schema,
+ "schemaType": "$schema_type"
+}
+EOF
+)
+
+ while [[ $attempt -le $max_attempts ]]; do
+ # Register the schema (with 30 second timeout)
+ local response
+ response=$(curl -s --max-time 30 -X POST \
+ -H "Content-Type: application/vnd.schemaregistry.v1+json" \
+ -d "$payload" \
+ "$SCHEMA_REGISTRY_URL/subjects/$subject/versions" 2>/dev/null)
+
+ if echo "$response" | jq -e '.id' >/dev/null 2>&1; then
+ local schema_id
+ schema_id=$(echo "$response" | jq -r '.id')
+ if [[ $attempt -gt 1 ]]; then
+ log_success "- Schema registered for $subject with ID: $schema_id [attempt $attempt]"
+ else
+ log_success "- Schema registered for $subject with ID: $schema_id"
+ fi
+ return 0
+ fi
+
+ # Check if it's a consumer lag timeout (error_code 50002)
+ local error_code
+ error_code=$(echo "$response" | jq -r '.error_code // empty' 2>/dev/null)
+
+ if [[ "$error_code" == "50002" && $attempt -lt $max_attempts ]]; then
+ # Consumer lag timeout - wait longer for consumer to catch up
+ # Use exponential backoff: 1s, 2s, 4s, 8s
+ local wait_time=$(echo "2 ^ ($attempt - 1)" | bc)
+ log_warning "Schema Registry consumer lag detected for $subject, waiting ${wait_time}s before retry (attempt $attempt)..."
+ sleep "$wait_time"
+ attempt=$((attempt + 1))
+ else
+ # Other error or max attempts reached
+ log_error "x Failed to register schema for $subject"
+ log_error "Response: $response"
+ return 1
+ fi
+ done
+
+ return 1
+}
+
+# Verify a schema exists (single attempt)
+verify_schema() {
+ local subject=$1
+
+ local response
+ response=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects/$subject/versions/latest" 2>/dev/null)
+
+ if echo "$response" | jq -e '.id' >/dev/null 2>&1; then
+ local schema_id
+ local version
+ schema_id=$(echo "$response" | jq -r '.id')
+ version=$(echo "$response" | jq -r '.version')
+ log_success "- Schema verified for $subject (ID: $schema_id, Version: $version)"
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Verify a schema exists with retry logic (handles Schema Registry consumer lag)
+verify_schema_with_retry() {
+ local subject=$1
+ local max_attempts=10
+ local attempt=1
+
+ log_info "Verifying schema for subject: $subject"
+
+ while [[ $attempt -le $max_attempts ]]; do
+ local response
+ response=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects/$subject/versions/latest" 2>/dev/null)
+
+ if echo "$response" | jq -e '.id' >/dev/null 2>&1; then
+ local schema_id
+ local version
+ schema_id=$(echo "$response" | jq -r '.id')
+ version=$(echo "$response" | jq -r '.version')
+
+ if [[ $attempt -gt 1 ]]; then
+ log_success "- Schema verified for $subject (ID: $schema_id, Version: $version) [attempt $attempt]"
+ else
+ log_success "- Schema verified for $subject (ID: $schema_id, Version: $version)"
+ fi
+ return 0
+ fi
+
+ # Schema not found, wait and retry (handles Schema Registry consumer lag)
+ if [[ $attempt -lt $max_attempts ]]; then
+ # Longer exponential backoff for Schema Registry consumer lag: 0.5s, 1s, 2s, 3s, 4s...
+ local wait_time=$(echo "scale=1; 0.5 * $attempt" | bc)
+ sleep "$wait_time"
+ attempt=$((attempt + 1))
+ else
+ log_error "x Schema not found for $subject (tried $max_attempts times)"
+ return 1
+ fi
+ done
+
+ return 1
+}
+
+# Register load test schemas (optimized for batch registration)
+register_loadtest_schemas() {
+ log_info "Registering load test schemas with multiple formats..."
+
+ # Define the Avro schema for load test messages
+ local avro_value_schema='{
+ "type": "record",
+ "name": "LoadTestMessage",
+ "namespace": "com.seaweedfs.loadtest",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "producer_id", "type": "int"},
+ {"name": "counter", "type": "long"},
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }'
+
+ # Define the JSON schema for load test messages
+ local json_value_schema='{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "LoadTestMessage",
+ "type": "object",
+ "properties": {
+ "id": {"type": "string"},
+ "timestamp": {"type": "integer"},
+ "producer_id": {"type": "integer"},
+ "counter": {"type": "integer"},
+ "user_id": {"type": "string"},
+ "event_type": {"type": "string"},
+ "properties": {
+ "type": "object",
+ "additionalProperties": {"type": "string"}
+ }
+ },
+ "required": ["id", "timestamp", "producer_id", "counter", "user_id", "event_type"]
+ }'
+
+ # Define the Protobuf schema for load test messages
+ local protobuf_value_schema='syntax = "proto3";
+
+package com.seaweedfs.loadtest;
+
+message LoadTestMessage {
+ string id = 1;
+ int64 timestamp = 2;
+ int32 producer_id = 3;
+ int64 counter = 4;
+ string user_id = 5;
+ string event_type = 6;
+ map<string, string> properties = 7;
+}'
+
+ # Define the key schema (simple string)
+ local avro_key_schema='{"type": "string"}'
+ local json_key_schema='{"type": "string"}'
+ local protobuf_key_schema='syntax = "proto3"; message Key { string key = 1; }'
+
+ # Register schemas for all load test topics with different formats
+ local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4")
+ local success_count=0
+ local total_schemas=0
+
+ # Distribute formats: topic-0=AVRO, topic-1=JSON, topic-2=PROTOBUF, topic-3=AVRO, topic-4=JSON
+ local idx=0
+ for topic in "${topics[@]}"; do
+ local format
+ local value_schema
+ local key_schema
+
+ # Determine format based on topic index (same as producer logic)
+ case $((idx % 3)) in
+ 0)
+ format="AVRO"
+ value_schema="$avro_value_schema"
+ key_schema="$avro_key_schema"
+ ;;
+ 1)
+ format="JSON"
+ value_schema="$json_value_schema"
+ key_schema="$json_key_schema"
+ ;;
+ 2)
+ format="PROTOBUF"
+ value_schema="$protobuf_value_schema"
+ key_schema="$protobuf_key_schema"
+ ;;
+ esac
+
+ log_info "Registering $topic with $format schema..."
+
+ # Register value schema
+ if register_schema "${topic}-value" "$value_schema" "$format"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+
+ # Small delay to let Schema Registry consumer process (prevents consumer lag)
+ sleep 0.2
+
+ # Register key schema
+ if register_schema "${topic}-key" "$key_schema" "$format"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+
+ # Small delay to let Schema Registry consumer process (prevents consumer lag)
+ sleep 0.2
+
+ idx=$((idx + 1))
+ done
+
+ log_info "Schema registration summary: $success_count/$total_schemas schemas registered successfully"
+ log_info "Format distribution: topic-0=AVRO, topic-1=JSON, topic-2=PROTOBUF, topic-3=AVRO, topic-4=JSON"
+
+ if [[ $success_count -eq $total_schemas ]]; then
+ log_success "All load test schemas registered successfully with multiple formats!"
+ return 0
+ else
+ log_error "Some schemas failed to register"
+ return 1
+ fi
+}
+
+# Verify all schemas are registered
+verify_loadtest_schemas() {
+ log_info "Verifying load test schemas..."
+
+ local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4")
+ local success_count=0
+ local total_schemas=0
+
+ for topic in "${topics[@]}"; do
+ # Verify value schema with retry (handles Schema Registry consumer lag)
+ if verify_schema_with_retry "${topic}-value"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+
+ # Verify key schema with retry (handles Schema Registry consumer lag)
+ if verify_schema_with_retry "${topic}-key"; then
+ success_count=$((success_count + 1))
+ fi
+ total_schemas=$((total_schemas + 1))
+ done
+
+ log_info "Schema verification summary: $success_count/$total_schemas schemas verified"
+
+ if [[ $success_count -eq $total_schemas ]]; then
+ log_success "All load test schemas verified successfully!"
+ return 0
+ else
+ log_error "Some schemas are missing or invalid"
+ return 1
+ fi
+}
+
+# List all registered subjects
+list_subjects() {
+ log_info "Listing all registered subjects..."
+
+ local subjects
+ subjects=$(curl -s --max-time 10 "$SCHEMA_REGISTRY_URL/subjects" 2>/dev/null)
+
+ if echo "$subjects" | jq -e '.[]' >/dev/null 2>&1; then
+ # Use process substitution instead of pipeline to avoid subshell exit code issues
+ while IFS= read -r subject; do
+ log_info " - $subject"
+ done < <(echo "$subjects" | jq -r '.[]')
+ else
+ log_warning "No subjects found or Schema Registry not accessible"
+ fi
+
+ return 0
+}
+
+# Clean up schemas (for testing)
+cleanup_schemas() {
+ log_warning "Cleaning up load test schemas..."
+
+ local topics=("loadtest-topic-0" "loadtest-topic-1" "loadtest-topic-2" "loadtest-topic-3" "loadtest-topic-4")
+
+ for topic in "${topics[@]}"; do
+ # Delete value schema (with timeout)
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-value" >/dev/null 2>&1 || true
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-value?permanent=true" >/dev/null 2>&1 || true
+
+ # Delete key schema (with timeout)
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-key" >/dev/null 2>&1 || true
+ curl -s --max-time 10 -X DELETE "$SCHEMA_REGISTRY_URL/subjects/${topic}-key?permanent=true" >/dev/null 2>&1 || true
+ done
+
+ log_success "Schema cleanup completed"
+}
+
+# Main function
+main() {
+ case "${1:-register}" in
+ "register")
+ wait_for_schema_registry
+ register_loadtest_schemas
+ ;;
+ "verify")
+ wait_for_schema_registry
+ verify_loadtest_schemas
+ ;;
+ "list")
+ wait_for_schema_registry
+ list_subjects
+ ;;
+ "cleanup")
+ wait_for_schema_registry
+ cleanup_schemas
+ ;;
+ "full")
+ wait_for_schema_registry
+ register_loadtest_schemas
+ # Wait for Schema Registry consumer to catch up before verification
+ log_info "Waiting 3 seconds for Schema Registry consumer to process all schemas..."
+ sleep 3
+ verify_loadtest_schemas
+ list_subjects
+ ;;
+ *)
+ echo "Usage: $0 [register|verify|list|cleanup|full]"
+ echo ""
+ echo "Commands:"
+ echo " register - Register load test schemas (default)"
+ echo " verify - Verify schemas are registered"
+ echo " list - List all registered subjects"
+ echo " cleanup - Clean up load test schemas"
+ echo " full - Register, verify, and list schemas"
+ echo ""
+ echo "Environment variables:"
+ echo " SCHEMA_REGISTRY_URL - Schema Registry URL (default: http://localhost:8081)"
+ echo " TIMEOUT - Maximum time to wait for Schema Registry (default: 60)"
+ echo " CHECK_INTERVAL - Check interval in seconds (default: 2)"
+ exit 1
+ ;;
+ esac
+
+ return 0
+}
+
+main "$@"
diff --git a/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh b/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh
new file mode 100755
index 000000000..7f6ddc79a
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh
@@ -0,0 +1,480 @@
+#!/bin/bash
+
+# Kafka Client Load Test Runner Script
+# This script helps run various load test scenarios against SeaweedFS Kafka Gateway
+
+set -euo pipefail
+
+# Default configuration
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
+DOCKER_COMPOSE_FILE="$PROJECT_DIR/docker-compose.yml"
+CONFIG_FILE="$PROJECT_DIR/config/loadtest.yaml"
+
+# Default test parameters
+TEST_MODE="comprehensive"
+TEST_DURATION="300s"
+PRODUCER_COUNT=10
+CONSUMER_COUNT=5
+MESSAGE_RATE=1000
+MESSAGE_SIZE=1024
+TOPIC_COUNT=5
+PARTITIONS_PER_TOPIC=3
+
+# Colors for output
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m' # No Color
+
+# Function to print colored output
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+log_warning() {
+ echo -e "${YELLOW}[WARNING]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+# Function to show usage
+show_usage() {
+ cat << EOF
+Kafka Client Load Test Runner
+
+Usage: $0 [OPTIONS] [COMMAND]
+
+Commands:
+ start Start the load test infrastructure and run tests
+ stop Stop all services
+ restart Restart all services
+ status Show service status
+ logs Show logs from all services
+ clean Clean up all resources (volumes, networks, etc.)
+ monitor Start monitoring stack (Prometheus + Grafana)
+ scenarios Run predefined test scenarios
+
+Options:
+ -m, --mode MODE Test mode: producer, consumer, comprehensive (default: comprehensive)
+ -d, --duration DURATION Test duration (default: 300s)
+ -p, --producers COUNT Number of producers (default: 10)
+ -c, --consumers COUNT Number of consumers (default: 5)
+ -r, --rate RATE Messages per second per producer (default: 1000)
+ -s, --size SIZE Message size in bytes (default: 1024)
+ -t, --topics COUNT Number of topics (default: 5)
+ --partitions COUNT Partitions per topic (default: 3)
+ --config FILE Configuration file (default: config/loadtest.yaml)
+ --monitoring Enable monitoring stack
+ --wait-ready Wait for services to be ready before starting tests
+ -v, --verbose Verbose output
+ -h, --help Show this help message
+
+Examples:
+ # Run comprehensive test for 5 minutes
+ $0 start -m comprehensive -d 5m
+
+ # Run producer-only test with high throughput
+ $0 start -m producer -p 20 -r 2000 -d 10m
+
+ # Run consumer-only test
+ $0 start -m consumer -c 10
+
+ # Run with monitoring
+ $0 start --monitoring -d 15m
+
+ # Clean up everything
+ $0 clean
+
+Predefined Scenarios:
+ quick Quick smoke test (1 min, low load)
+ standard Standard load test (5 min, medium load)
+ stress Stress test (10 min, high load)
+ endurance Endurance test (30 min, sustained load)
+ burst Burst test (variable load)
+
+EOF
+}
+
+# Parse command line arguments
+parse_args() {
+ while [[ $# -gt 0 ]]; do
+ case $1 in
+ -m|--mode)
+ TEST_MODE="$2"
+ shift 2
+ ;;
+ -d|--duration)
+ TEST_DURATION="$2"
+ shift 2
+ ;;
+ -p|--producers)
+ PRODUCER_COUNT="$2"
+ shift 2
+ ;;
+ -c|--consumers)
+ CONSUMER_COUNT="$2"
+ shift 2
+ ;;
+ -r|--rate)
+ MESSAGE_RATE="$2"
+ shift 2
+ ;;
+ -s|--size)
+ MESSAGE_SIZE="$2"
+ shift 2
+ ;;
+ -t|--topics)
+ TOPIC_COUNT="$2"
+ shift 2
+ ;;
+ --partitions)
+ PARTITIONS_PER_TOPIC="$2"
+ shift 2
+ ;;
+ --config)
+ CONFIG_FILE="$2"
+ shift 2
+ ;;
+ --monitoring)
+ ENABLE_MONITORING=1
+ shift
+ ;;
+ --wait-ready)
+ WAIT_READY=1
+ shift
+ ;;
+ -v|--verbose)
+ VERBOSE=1
+ shift
+ ;;
+ -h|--help)
+ show_usage
+ exit 0
+ ;;
+ -*)
+ log_error "Unknown option: $1"
+ show_usage
+ exit 1
+ ;;
+ *)
+ if [[ -z "${COMMAND:-}" ]]; then
+ COMMAND="$1"
+ else
+ log_error "Multiple commands specified"
+ show_usage
+ exit 1
+ fi
+ shift
+ ;;
+ esac
+ done
+}
+
+# Check if Docker and Docker Compose are available
+check_dependencies() {
+ if ! command -v docker &> /dev/null; then
+ log_error "Docker is not installed or not in PATH"
+ exit 1
+ fi
+
+ if ! command -v docker-compose &> /dev/null && ! docker compose version &> /dev/null; then
+ log_error "Docker Compose is not installed or not in PATH"
+ exit 1
+ fi
+
+ # Use docker compose if available, otherwise docker-compose
+ if docker compose version &> /dev/null; then
+ DOCKER_COMPOSE="docker compose"
+ else
+ DOCKER_COMPOSE="docker-compose"
+ fi
+}
+
+# Wait for services to be ready
+wait_for_services() {
+ log_info "Waiting for services to be ready..."
+
+ local timeout=300 # 5 minutes timeout
+ local elapsed=0
+ local check_interval=5
+
+ while [[ $elapsed -lt $timeout ]]; do
+ if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps --format table | grep -q "healthy"; then
+ if check_service_health; then
+ log_success "All services are ready!"
+ return 0
+ fi
+ fi
+
+ sleep $check_interval
+ elapsed=$((elapsed + check_interval))
+ log_info "Waiting... ($elapsed/${timeout}s)"
+ done
+
+ log_error "Services did not become ready within $timeout seconds"
+ return 1
+}
+
+# Check health of critical services
+check_service_health() {
+ # Check Kafka Gateway
+ if ! curl -s http://localhost:9093 >/dev/null 2>&1; then
+ return 1
+ fi
+
+ # Check Schema Registry
+ if ! curl -s http://localhost:8081/subjects >/dev/null 2>&1; then
+ return 1
+ fi
+
+ return 0
+}
+
+# Start the load test infrastructure
+start_services() {
+ log_info "Starting SeaweedFS Kafka load test infrastructure..."
+
+ # Set environment variables
+ export TEST_MODE="$TEST_MODE"
+ export TEST_DURATION="$TEST_DURATION"
+ export PRODUCER_COUNT="$PRODUCER_COUNT"
+ export CONSUMER_COUNT="$CONSUMER_COUNT"
+ export MESSAGE_RATE="$MESSAGE_RATE"
+ export MESSAGE_SIZE="$MESSAGE_SIZE"
+ export TOPIC_COUNT="$TOPIC_COUNT"
+ export PARTITIONS_PER_TOPIC="$PARTITIONS_PER_TOPIC"
+
+ # Start core services
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" up -d \
+ seaweedfs-master \
+ seaweedfs-volume \
+ seaweedfs-filer \
+ seaweedfs-mq-broker \
+ kafka-gateway \
+ schema-registry
+
+ # Start monitoring if enabled
+ if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then
+ log_info "Starting monitoring stack..."
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d
+ fi
+
+ # Wait for services to be ready if requested
+ if [[ "${WAIT_READY:-0}" == "1" ]]; then
+ wait_for_services
+ fi
+
+ log_success "Infrastructure started successfully"
+}
+
+# Run the load test
+run_loadtest() {
+ log_info "Starting Kafka client load test..."
+ log_info "Mode: $TEST_MODE, Duration: $TEST_DURATION"
+ log_info "Producers: $PRODUCER_COUNT, Consumers: $CONSUMER_COUNT"
+ log_info "Message Rate: $MESSAGE_RATE msgs/sec, Size: $MESSAGE_SIZE bytes"
+
+ # Run the load test
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest up --abort-on-container-exit kafka-client-loadtest
+
+ # Show test results
+ show_results
+}
+
+# Show test results
+show_results() {
+ log_info "Load test completed! Gathering results..."
+
+ # Get final metrics from the load test container
+ if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps kafka-client-loadtest-runner &>/dev/null; then
+ log_info "Final test statistics:"
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" exec -T kafka-client-loadtest-runner curl -s http://localhost:8080/stats || true
+ fi
+
+ # Show Prometheus metrics if monitoring is enabled
+ if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then
+ log_info "Monitoring dashboards available at:"
+ log_info " Prometheus: http://localhost:9090"
+ log_info " Grafana: http://localhost:3000 (admin/admin)"
+ fi
+
+ # Show where results are stored
+ if [[ -d "$PROJECT_DIR/test-results" ]]; then
+ log_info "Test results saved to: $PROJECT_DIR/test-results/"
+ fi
+}
+
+# Stop services
+stop_services() {
+ log_info "Stopping all services..."
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down
+ log_success "Services stopped"
+}
+
+# Show service status
+show_status() {
+ log_info "Service status:"
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps
+}
+
+# Show logs
+show_logs() {
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" logs -f "${1:-}"
+}
+
+# Clean up all resources
+clean_all() {
+ log_warning "This will remove all volumes, networks, and containers. Are you sure? (y/N)"
+ read -r response
+ if [[ "$response" =~ ^[Yy]$ ]]; then
+ log_info "Cleaning up all resources..."
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down -v --remove-orphans
+
+ # Remove any remaining volumes
+ docker volume ls -q | grep -E "(kafka-client-loadtest|seaweedfs)" | xargs -r docker volume rm
+
+ # Remove networks
+ docker network ls -q | grep -E "kafka-client-loadtest" | xargs -r docker network rm
+
+ log_success "Cleanup completed"
+ else
+ log_info "Cleanup cancelled"
+ fi
+}
+
+# Run predefined scenarios
+run_scenario() {
+ local scenario="$1"
+
+ case "$scenario" in
+ quick)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="1m"
+ PRODUCER_COUNT=2
+ CONSUMER_COUNT=2
+ MESSAGE_RATE=100
+ MESSAGE_SIZE=512
+ TOPIC_COUNT=2
+ ;;
+ standard)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="5m"
+ PRODUCER_COUNT=5
+ CONSUMER_COUNT=3
+ MESSAGE_RATE=500
+ MESSAGE_SIZE=1024
+ TOPIC_COUNT=3
+ ;;
+ stress)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="10m"
+ PRODUCER_COUNT=20
+ CONSUMER_COUNT=10
+ MESSAGE_RATE=2000
+ MESSAGE_SIZE=2048
+ TOPIC_COUNT=10
+ ;;
+ endurance)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="30m"
+ PRODUCER_COUNT=10
+ CONSUMER_COUNT=5
+ MESSAGE_RATE=1000
+ MESSAGE_SIZE=1024
+ TOPIC_COUNT=5
+ ;;
+ burst)
+ TEST_MODE="comprehensive"
+ TEST_DURATION="10m"
+ PRODUCER_COUNT=10
+ CONSUMER_COUNT=5
+ MESSAGE_RATE=1000
+ MESSAGE_SIZE=1024
+ TOPIC_COUNT=5
+ # Note: Burst behavior would be configured in the load test config
+ ;;
+ *)
+ log_error "Unknown scenario: $scenario"
+ log_info "Available scenarios: quick, standard, stress, endurance, burst"
+ exit 1
+ ;;
+ esac
+
+ log_info "Running $scenario scenario..."
+ start_services
+ if [[ "${WAIT_READY:-0}" == "1" ]]; then
+ wait_for_services
+ fi
+ run_loadtest
+}
+
+# Main execution
+main() {
+ if [[ $# -eq 0 ]]; then
+ show_usage
+ exit 0
+ fi
+
+ parse_args "$@"
+ check_dependencies
+
+ case "${COMMAND:-}" in
+ start)
+ start_services
+ run_loadtest
+ ;;
+ stop)
+ stop_services
+ ;;
+ restart)
+ stop_services
+ start_services
+ ;;
+ status)
+ show_status
+ ;;
+ logs)
+ show_logs
+ ;;
+ clean)
+ clean_all
+ ;;
+ monitor)
+ ENABLE_MONITORING=1
+ $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d
+ log_success "Monitoring stack started"
+ log_info "Prometheus: http://localhost:9090"
+ log_info "Grafana: http://localhost:3000 (admin/admin)"
+ ;;
+ scenarios)
+ if [[ -n "${2:-}" ]]; then
+ run_scenario "$2"
+ else
+ log_error "Please specify a scenario"
+ log_info "Available scenarios: quick, standard, stress, endurance, burst"
+ exit 1
+ fi
+ ;;
+ *)
+ log_error "Unknown command: ${COMMAND:-}"
+ show_usage
+ exit 1
+ ;;
+ esac
+}
+
+# Set default values
+ENABLE_MONITORING=0
+WAIT_READY=0
+VERBOSE=0
+
+# Run main function
+main "$@"
diff --git a/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh b/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh
new file mode 100755
index 000000000..3ea43f998
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/setup-monitoring.sh
@@ -0,0 +1,352 @@
+#!/bin/bash
+
+# Setup monitoring for Kafka Client Load Test
+# This script sets up Prometheus and Grafana configurations
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
+MONITORING_DIR="$PROJECT_DIR/monitoring"
+
+# Colors
+GREEN='\033[0;32m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+# Create monitoring directory structure
+setup_directories() {
+ log_info "Setting up monitoring directories..."
+
+ mkdir -p "$MONITORING_DIR/prometheus"
+ mkdir -p "$MONITORING_DIR/grafana/dashboards"
+ mkdir -p "$MONITORING_DIR/grafana/provisioning/dashboards"
+ mkdir -p "$MONITORING_DIR/grafana/provisioning/datasources"
+
+ log_success "Directories created"
+}
+
+# Create Prometheus configuration
+create_prometheus_config() {
+ log_info "Creating Prometheus configuration..."
+
+ cat > "$MONITORING_DIR/prometheus/prometheus.yml" << 'EOF'
+# Prometheus configuration for Kafka Load Test monitoring
+
+global:
+ scrape_interval: 15s
+ evaluation_interval: 15s
+
+rule_files:
+ # - "first_rules.yml"
+ # - "second_rules.yml"
+
+scrape_configs:
+ # Scrape Prometheus itself
+ - job_name: 'prometheus'
+ static_configs:
+ - targets: ['localhost:9090']
+
+ # Scrape load test metrics
+ - job_name: 'kafka-loadtest'
+ static_configs:
+ - targets: ['kafka-client-loadtest-runner:8080']
+ scrape_interval: 5s
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Master metrics
+ - job_name: 'seaweedfs-master'
+ static_configs:
+ - targets: ['seaweedfs-master:9333']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Volume metrics
+ - job_name: 'seaweedfs-volume'
+ static_configs:
+ - targets: ['seaweedfs-volume:8080']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS Filer metrics
+ - job_name: 'seaweedfs-filer'
+ static_configs:
+ - targets: ['seaweedfs-filer:8888']
+ metrics_path: '/metrics'
+
+ # Scrape SeaweedFS MQ Broker metrics (if available)
+ - job_name: 'seaweedfs-mq-broker'
+ static_configs:
+ - targets: ['seaweedfs-mq-broker:17777']
+ metrics_path: '/metrics'
+ scrape_interval: 10s
+
+ # Scrape Kafka Gateway metrics (if available)
+ - job_name: 'kafka-gateway'
+ static_configs:
+ - targets: ['kafka-gateway:9093']
+ metrics_path: '/metrics'
+ scrape_interval: 10s
+EOF
+
+ log_success "Prometheus configuration created"
+}
+
+# Create Grafana datasource configuration
+create_grafana_datasource() {
+ log_info "Creating Grafana datasource configuration..."
+
+ cat > "$MONITORING_DIR/grafana/provisioning/datasources/datasource.yml" << 'EOF'
+apiVersion: 1
+
+datasources:
+ - name: Prometheus
+ type: prometheus
+ access: proxy
+ orgId: 1
+ url: http://prometheus:9090
+ basicAuth: false
+ isDefault: true
+ editable: true
+ version: 1
+EOF
+
+ log_success "Grafana datasource configuration created"
+}
+
+# Create Grafana dashboard provisioning
+create_grafana_dashboard_provisioning() {
+ log_info "Creating Grafana dashboard provisioning..."
+
+ cat > "$MONITORING_DIR/grafana/provisioning/dashboards/dashboard.yml" << 'EOF'
+apiVersion: 1
+
+providers:
+ - name: 'default'
+ orgId: 1
+ folder: ''
+ type: file
+ disableDeletion: false
+ editable: true
+ options:
+ path: /var/lib/grafana/dashboards
+EOF
+
+ log_success "Grafana dashboard provisioning created"
+}
+
+# Create Kafka Load Test dashboard
+create_loadtest_dashboard() {
+ log_info "Creating Kafka Load Test Grafana dashboard..."
+
+ cat > "$MONITORING_DIR/grafana/dashboards/kafka-loadtest.json" << 'EOF'
+{
+ "dashboard": {
+ "id": null,
+ "title": "Kafka Client Load Test Dashboard",
+ "tags": ["kafka", "loadtest", "seaweedfs"],
+ "timezone": "browser",
+ "panels": [
+ {
+ "id": 1,
+ "title": "Messages Produced/Consumed",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_messages_produced_total[5m])",
+ "legendFormat": "Produced/sec"
+ },
+ {
+ "expr": "rate(kafka_loadtest_messages_consumed_total[5m])",
+ "legendFormat": "Consumed/sec"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
+ },
+ {
+ "id": 2,
+ "title": "Message Latency",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "histogram_quantile(0.95, kafka_loadtest_message_latency_seconds)",
+ "legendFormat": "95th percentile"
+ },
+ {
+ "expr": "histogram_quantile(0.99, kafka_loadtest_message_latency_seconds)",
+ "legendFormat": "99th percentile"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
+ },
+ {
+ "id": 3,
+ "title": "Error Rates",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_producer_errors_total[5m])",
+ "legendFormat": "Producer Errors/sec"
+ },
+ {
+ "expr": "rate(kafka_loadtest_consumer_errors_total[5m])",
+ "legendFormat": "Consumer Errors/sec"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
+ },
+ {
+ "id": 4,
+ "title": "Throughput (MB/s)",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "rate(kafka_loadtest_bytes_produced_total[5m]) / 1024 / 1024",
+ "legendFormat": "Produced MB/s"
+ },
+ {
+ "expr": "rate(kafka_loadtest_bytes_consumed_total[5m]) / 1024 / 1024",
+ "legendFormat": "Consumed MB/s"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 0, "y": 16}
+ },
+ {
+ "id": 5,
+ "title": "Active Clients",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "kafka_loadtest_active_producers",
+ "legendFormat": "Producers"
+ },
+ {
+ "expr": "kafka_loadtest_active_consumers",
+ "legendFormat": "Consumers"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 12, "x": 12, "y": 16}
+ },
+ {
+ "id": 6,
+ "title": "Consumer Lag",
+ "type": "graph",
+ "targets": [
+ {
+ "expr": "kafka_loadtest_consumer_lag_messages",
+ "legendFormat": "{{consumer_group}}-{{topic}}-{{partition}}"
+ }
+ ],
+ "gridPos": {"h": 8, "w": 24, "x": 0, "y": 24}
+ }
+ ],
+ "time": {"from": "now-30m", "to": "now"},
+ "refresh": "5s",
+ "schemaVersion": 16,
+ "version": 0
+ }
+}
+EOF
+
+ log_success "Kafka Load Test dashboard created"
+}
+
+# Create SeaweedFS dashboard
+create_seaweedfs_dashboard() {
+ log_info "Creating SeaweedFS Grafana dashboard..."
+
+ cat > "$MONITORING_DIR/grafana/dashboards/seaweedfs.json" << 'EOF'
+{
+ "dashboard": {
+ "id": null,
+ "title": "SeaweedFS Cluster Dashboard",
+ "tags": ["seaweedfs", "storage"],
+ "timezone": "browser",
+ "panels": [
+ {
+ "id": 1,
+ "title": "Master Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-master\"}",
+ "legendFormat": "Master Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 0, "y": 0}
+ },
+ {
+ "id": 2,
+ "title": "Volume Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-volume\"}",
+ "legendFormat": "Volume Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 6, "y": 0}
+ },
+ {
+ "id": 3,
+ "title": "Filer Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-filer\"}",
+ "legendFormat": "Filer Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 12, "y": 0}
+ },
+ {
+ "id": 4,
+ "title": "MQ Broker Status",
+ "type": "stat",
+ "targets": [
+ {
+ "expr": "up{job=\"seaweedfs-mq-broker\"}",
+ "legendFormat": "MQ Broker Up"
+ }
+ ],
+ "gridPos": {"h": 4, "w": 6, "x": 18, "y": 0}
+ }
+ ],
+ "time": {"from": "now-30m", "to": "now"},
+ "refresh": "10s",
+ "schemaVersion": 16,
+ "version": 0
+ }
+}
+EOF
+
+ log_success "SeaweedFS dashboard created"
+}
+
+# Main setup function
+main() {
+ log_info "Setting up monitoring for Kafka Client Load Test..."
+
+ setup_directories
+ create_prometheus_config
+ create_grafana_datasource
+ create_grafana_dashboard_provisioning
+ create_loadtest_dashboard
+ create_seaweedfs_dashboard
+
+ log_success "Monitoring setup completed!"
+ log_info "You can now start the monitoring stack with:"
+ log_info " ./scripts/run-loadtest.sh monitor"
+ log_info ""
+ log_info "After starting, access:"
+ log_info " Prometheus: http://localhost:9090"
+ log_info " Grafana: http://localhost:3000 (admin/admin)"
+}
+
+main "$@"
diff --git a/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh b/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh
new file mode 100755
index 000000000..e1a2f73e2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/test-retry-logic.sh
@@ -0,0 +1,151 @@
+#!/bin/bash
+
+# Test script to verify the retry logic works correctly
+# Simulates Schema Registry eventual consistency behavior
+
+set -euo pipefail
+
+# Colors
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[TEST]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[PASS]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[FAIL]${NC} $1"
+}
+
+# Mock function that simulates Schema Registry eventual consistency
+# First N attempts fail, then succeeds
+mock_schema_registry_query() {
+ local subject=$1
+ local min_attempts_to_succeed=$2
+ local current_attempt=$3
+
+ if [[ $current_attempt -ge $min_attempts_to_succeed ]]; then
+ # Simulate successful response
+ echo '{"id":1,"version":1,"schema":"test"}'
+ return 0
+ else
+ # Simulate 404 Not Found
+ echo '{"error_code":40401,"message":"Subject not found"}'
+ return 1
+ fi
+}
+
+# Simulate verify_schema_with_retry logic
+test_verify_with_retry() {
+ local subject=$1
+ local min_attempts_to_succeed=$2
+ local max_attempts=5
+ local attempt=1
+
+ log_info "Testing $subject (should succeed after $min_attempts_to_succeed attempts)"
+
+ while [[ $attempt -le $max_attempts ]]; do
+ local response
+ if response=$(mock_schema_registry_query "$subject" "$min_attempts_to_succeed" "$attempt"); then
+ if echo "$response" | grep -q '"id"'; then
+ if [[ $attempt -gt 1 ]]; then
+ log_success "$subject verified after $attempt attempts"
+ else
+ log_success "$subject verified on first attempt"
+ fi
+ return 0
+ fi
+ fi
+
+ # Schema not found, wait and retry
+ if [[ $attempt -lt $max_attempts ]]; then
+ # Exponential backoff: 0.1s, 0.2s, 0.4s, 0.8s
+ local wait_time=$(echo "scale=3; 0.1 * (2 ^ ($attempt - 1))" | bc)
+ log_info " Attempt $attempt failed, waiting ${wait_time}s before retry..."
+ sleep "$wait_time"
+ attempt=$((attempt + 1))
+ else
+ log_error "$subject verification failed after $max_attempts attempts"
+ return 1
+ fi
+ done
+
+ return 1
+}
+
+# Run tests
+log_info "=========================================="
+log_info "Testing Schema Registry Retry Logic"
+log_info "=========================================="
+echo ""
+
+# Test 1: Schema available immediately
+log_info "Test 1: Schema available immediately"
+if test_verify_with_retry "immediate-schema" 1; then
+ log_success "✓ Test 1 passed"
+else
+ log_error "✗ Test 1 failed"
+ exit 1
+fi
+echo ""
+
+# Test 2: Schema available after 2 attempts (200ms delay)
+log_info "Test 2: Schema available after 2 attempts"
+if test_verify_with_retry "delayed-schema-2" 2; then
+ log_success "✓ Test 2 passed"
+else
+ log_error "✗ Test 2 failed"
+ exit 1
+fi
+echo ""
+
+# Test 3: Schema available after 3 attempts (600ms delay)
+log_info "Test 3: Schema available after 3 attempts"
+if test_verify_with_retry "delayed-schema-3" 3; then
+ log_success "✓ Test 3 passed"
+else
+ log_error "✗ Test 3 failed"
+ exit 1
+fi
+echo ""
+
+# Test 4: Schema available after 4 attempts (1400ms delay)
+log_info "Test 4: Schema available after 4 attempts"
+if test_verify_with_retry "delayed-schema-4" 4; then
+ log_success "✓ Test 4 passed"
+else
+ log_error "✗ Test 4 failed"
+ exit 1
+fi
+echo ""
+
+# Test 5: Schema never available (should fail)
+log_info "Test 5: Schema never available (should fail gracefully)"
+if test_verify_with_retry "missing-schema" 10; then
+ log_error "✗ Test 5 failed (should have failed but passed)"
+ exit 1
+else
+ log_success "✓ Test 5 passed (correctly failed after max attempts)"
+fi
+echo ""
+
+log_success "=========================================="
+log_success "All tests passed! ✓"
+log_success "=========================================="
+log_info ""
+log_info "Summary:"
+log_info "- Immediate availability: works ✓"
+log_info "- 2-4 retry attempts: works ✓"
+log_info "- Max attempts handling: works ✓"
+log_info "- Exponential backoff: works ✓"
+log_info ""
+log_info "Total retry time budget: ~1.5 seconds (0.1+0.2+0.4+0.8)"
+log_info "This should handle Schema Registry consumer lag gracefully."
+
diff --git a/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh b/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh
new file mode 100755
index 000000000..d2560728b
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh
@@ -0,0 +1,291 @@
+#!/bin/bash
+
+# Wait for SeaweedFS and Kafka Gateway services to be ready
+# This script checks service health and waits until all services are operational
+
+set -euo pipefail
+
+# Colors
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[0;33m'
+BLUE='\033[0;34m'
+NC='\033[0m'
+
+log_info() {
+ echo -e "${BLUE}[INFO]${NC} $1"
+}
+
+log_success() {
+ echo -e "${GREEN}[SUCCESS]${NC} $1"
+}
+
+log_warning() {
+ echo -e "${YELLOW}[WARNING]${NC} $1"
+}
+
+log_error() {
+ echo -e "${RED}[ERROR]${NC} $1"
+}
+
+# Configuration
+TIMEOUT=${TIMEOUT:-300} # 5 minutes default timeout
+CHECK_INTERVAL=${CHECK_INTERVAL:-5} # Check every 5 seconds
+SEAWEEDFS_MASTER_URL=${SEAWEEDFS_MASTER_URL:-"http://localhost:9333"}
+KAFKA_GATEWAY_URL=${KAFKA_GATEWAY_URL:-"localhost:9093"}
+SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"}
+SEAWEEDFS_FILER_URL=${SEAWEEDFS_FILER_URL:-"http://localhost:8888"}
+
+# Check if a service is reachable
+check_http_service() {
+ local url=$1
+ local name=$2
+
+ if curl -sf "$url" >/dev/null 2>&1; then
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Check TCP port
+check_tcp_service() {
+ local host=$1
+ local port=$2
+ local name=$3
+
+ if timeout 3 bash -c "</dev/tcp/$host/$port" 2>/dev/null; then
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Check SeaweedFS Master
+check_seaweedfs_master() {
+ if check_http_service "$SEAWEEDFS_MASTER_URL/cluster/status" "SeaweedFS Master"; then
+ # Additional check: ensure cluster has volumes
+ local status_json
+ status_json=$(curl -s "$SEAWEEDFS_MASTER_URL/cluster/status" 2>/dev/null || echo "{}")
+
+ # Check if we have at least one volume server
+ if echo "$status_json" | grep -q '"Max":0'; then
+ log_warning "SeaweedFS Master is running but no volumes are available"
+ return 1
+ fi
+
+ return 0
+ fi
+ return 1
+}
+
+# Check SeaweedFS Filer
+check_seaweedfs_filer() {
+ check_http_service "$SEAWEEDFS_FILER_URL/" "SeaweedFS Filer"
+}
+
+# Check Kafka Gateway
+check_kafka_gateway() {
+ local host="localhost"
+ local port="9093"
+ check_tcp_service "$host" "$port" "Kafka Gateway"
+}
+
+# Check Schema Registry
+check_schema_registry() {
+ # Check if Schema Registry container is running first
+ if ! docker compose ps schema-registry | grep -q "Up"; then
+ # Schema Registry is not running, which is okay for basic tests
+ return 0
+ fi
+
+ # FIXED: Wait for Docker healthcheck to report "healthy", not just "Up"
+ # Schema Registry has a 30s start_period, so we need to wait for the actual healthcheck
+ local health_status
+ health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "none")
+
+ # If container has no healthcheck or healthcheck is not yet healthy, check HTTP directly
+ if [[ "$health_status" == "healthy" ]]; then
+ # Container reports healthy, do a final verification
+ if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then
+ return 0
+ fi
+ elif [[ "$health_status" == "starting" ]]; then
+ # Still in startup period, wait longer
+ return 1
+ elif [[ "$health_status" == "none" ]]; then
+ # No healthcheck defined (shouldn't happen), fall back to HTTP check
+ if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then
+ local subjects
+ subjects=$(curl -s "$SCHEMA_REGISTRY_URL/subjects" 2>/dev/null || echo "[]")
+
+ # Schema registry should at least return an empty array
+ if [[ "$subjects" == "[]" ]]; then
+ return 0
+ elif echo "$subjects" | grep -q '\['; then
+ return 0
+ else
+ log_warning "Schema Registry is not properly connected"
+ return 1
+ fi
+ fi
+ fi
+ return 1
+}
+
+# Check MQ Broker
+check_mq_broker() {
+ check_tcp_service "localhost" "17777" "SeaweedFS MQ Broker"
+}
+
+# Main health check function
+check_all_services() {
+ local all_healthy=true
+
+ log_info "Checking service health..."
+
+ # Check SeaweedFS Master
+ if check_seaweedfs_master; then
+ log_success "✓ SeaweedFS Master is healthy"
+ else
+ log_error "✗ SeaweedFS Master is not ready"
+ all_healthy=false
+ fi
+
+ # Check SeaweedFS Filer
+ if check_seaweedfs_filer; then
+ log_success "✓ SeaweedFS Filer is healthy"
+ else
+ log_error "✗ SeaweedFS Filer is not ready"
+ all_healthy=false
+ fi
+
+ # Check MQ Broker
+ if check_mq_broker; then
+ log_success "✓ SeaweedFS MQ Broker is healthy"
+ else
+ log_error "✗ SeaweedFS MQ Broker is not ready"
+ all_healthy=false
+ fi
+
+ # Check Kafka Gateway
+ if check_kafka_gateway; then
+ log_success "✓ Kafka Gateway is healthy"
+ else
+ log_error "✗ Kafka Gateway is not ready"
+ all_healthy=false
+ fi
+
+ # Check Schema Registry
+ if ! docker compose ps schema-registry | grep -q "Up"; then
+ log_warning "⚠ Schema Registry is stopped (skipping)"
+ elif check_schema_registry; then
+ log_success "✓ Schema Registry is healthy"
+ else
+ # Check if it's still starting up (healthcheck start_period)
+ local health_status
+ health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "unknown")
+ if [[ "$health_status" == "starting" ]]; then
+ log_warning "⏳ Schema Registry is starting (waiting for healthcheck...)"
+ else
+ log_error "✗ Schema Registry is not ready (status: $health_status)"
+ fi
+ all_healthy=false
+ fi
+
+ $all_healthy
+}
+
+# Wait for all services to be ready
+wait_for_services() {
+ log_info "Waiting for all services to be ready (timeout: ${TIMEOUT}s)..."
+
+ local elapsed=0
+
+ while [[ $elapsed -lt $TIMEOUT ]]; do
+ if check_all_services; then
+ log_success "All services are ready! (took ${elapsed}s)"
+ return 0
+ fi
+
+ log_info "Some services are not ready yet. Waiting ${CHECK_INTERVAL}s... (${elapsed}/${TIMEOUT}s)"
+ sleep $CHECK_INTERVAL
+ elapsed=$((elapsed + CHECK_INTERVAL))
+ done
+
+ log_error "Services did not become ready within ${TIMEOUT} seconds"
+ log_error "Final service status:"
+ check_all_services
+
+ # Always dump Schema Registry diagnostics on timeout since it's the problematic service
+ log_error "==========================================="
+ log_error "Schema Registry Container Status:"
+ log_error "==========================================="
+ docker compose ps schema-registry 2>&1 || echo "Failed to get container status"
+ docker inspect loadtest-schema-registry --format='Health: {{.State.Health.Status}} ({{len .State.Health.Log}} checks)' 2>&1 || echo "Failed to inspect container"
+ log_error "==========================================="
+
+ log_error "Network Connectivity Check:"
+ log_error "==========================================="
+ log_error "Can Schema Registry reach Kafka Gateway?"
+ docker compose exec -T schema-registry ping -c 3 kafka-gateway 2>&1 || echo "Ping failed"
+ docker compose exec -T schema-registry nc -zv kafka-gateway 9093 2>&1 || echo "Port 9093 unreachable"
+ log_error "==========================================="
+
+ log_error "Schema Registry Logs (last 100 lines):"
+ log_error "==========================================="
+ docker compose logs --tail=100 schema-registry 2>&1 || echo "Failed to get Schema Registry logs"
+ log_error "==========================================="
+
+ log_error "Kafka Gateway Logs (last 50 lines with 'SR' prefix):"
+ log_error "==========================================="
+ docker compose logs --tail=200 kafka-gateway 2>&1 | grep -i "SR" | tail -50 || echo "No SR-related logs found in Kafka Gateway"
+ log_error "==========================================="
+
+ log_error "MQ Broker Logs (last 30 lines):"
+ log_error "==========================================="
+ docker compose logs --tail=30 seaweedfs-mq-broker 2>&1 || echo "Failed to get MQ Broker logs"
+ log_error "==========================================="
+
+ return 1
+}
+
+# Show current service status
+show_status() {
+ log_info "Current service status:"
+ check_all_services
+}
+
+# Main function
+main() {
+ case "${1:-wait}" in
+ "wait")
+ wait_for_services
+ ;;
+ "check")
+ show_status
+ ;;
+ "status")
+ show_status
+ ;;
+ *)
+ echo "Usage: $0 [wait|check|status]"
+ echo ""
+ echo "Commands:"
+ echo " wait - Wait for all services to be ready (default)"
+ echo " check - Check current service status"
+ echo " status - Same as check"
+ echo ""
+ echo "Environment variables:"
+ echo " TIMEOUT - Maximum time to wait in seconds (default: 300)"
+ echo " CHECK_INTERVAL - Check interval in seconds (default: 5)"
+ echo " SEAWEEDFS_MASTER_URL - Master URL (default: http://localhost:9333)"
+ echo " KAFKA_GATEWAY_URL - Gateway URL (default: localhost:9093)"
+ echo " SCHEMA_REGISTRY_URL - Schema Registry URL (default: http://localhost:8081)"
+ echo " SEAWEEDFS_FILER_URL - Filer URL (default: http://localhost:8888)"
+ exit 1
+ ;;
+ esac
+}
+
+main "$@"