aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/scripts/run-loadtest.sh
blob: 7f6ddc79ae7d238f4acb1f6623d173413c2b79ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
#!/bin/bash

# Kafka Client Load Test Runner Script
# This script helps run various load test scenarios against SeaweedFS Kafka Gateway

set -euo pipefail

# Default configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
DOCKER_COMPOSE_FILE="$PROJECT_DIR/docker-compose.yml"
CONFIG_FILE="$PROJECT_DIR/config/loadtest.yaml"

# Default test parameters
TEST_MODE="comprehensive"
TEST_DURATION="300s"
PRODUCER_COUNT=10
CONSUMER_COUNT=5
MESSAGE_RATE=1000
MESSAGE_SIZE=1024
TOPIC_COUNT=5
PARTITIONS_PER_TOPIC=3

# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

# Function to print colored output
log_info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

log_success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

log_warning() {
    echo -e "${YELLOW}[WARNING]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# Function to show usage
show_usage() {
    cat << EOF
Kafka Client Load Test Runner

Usage: $0 [OPTIONS] [COMMAND]

Commands:
  start               Start the load test infrastructure and run tests
  stop                Stop all services
  restart             Restart all services
  status              Show service status
  logs                Show logs from all services
  clean               Clean up all resources (volumes, networks, etc.)
  monitor             Start monitoring stack (Prometheus + Grafana)
  scenarios           Run predefined test scenarios

Options:
  -m, --mode MODE           Test mode: producer, consumer, comprehensive (default: comprehensive)
  -d, --duration DURATION   Test duration (default: 300s)
  -p, --producers COUNT     Number of producers (default: 10)
  -c, --consumers COUNT     Number of consumers (default: 5)
  -r, --rate RATE          Messages per second per producer (default: 1000)
  -s, --size SIZE          Message size in bytes (default: 1024)
  -t, --topics COUNT       Number of topics (default: 5)
  --partitions COUNT       Partitions per topic (default: 3)
  --config FILE           Configuration file (default: config/loadtest.yaml)
  --monitoring            Enable monitoring stack
  --wait-ready            Wait for services to be ready before starting tests
  -v, --verbose           Verbose output
  -h, --help              Show this help message

Examples:
  # Run comprehensive test for 5 minutes
  $0 start -m comprehensive -d 5m

  # Run producer-only test with high throughput
  $0 start -m producer -p 20 -r 2000 -d 10m

  # Run consumer-only test
  $0 start -m consumer -c 10

  # Run with monitoring
  $0 start --monitoring -d 15m

  # Clean up everything
  $0 clean

Predefined Scenarios:
  quick              Quick smoke test (1 min, low load)
  standard           Standard load test (5 min, medium load) 
  stress             Stress test (10 min, high load)
  endurance          Endurance test (30 min, sustained load)
  burst              Burst test (variable load)

EOF
}

# Parse command line arguments
parse_args() {
    while [[ $# -gt 0 ]]; do
        case $1 in
            -m|--mode)
                TEST_MODE="$2"
                shift 2
                ;;
            -d|--duration)
                TEST_DURATION="$2"
                shift 2
                ;;
            -p|--producers)
                PRODUCER_COUNT="$2"
                shift 2
                ;;
            -c|--consumers)
                CONSUMER_COUNT="$2"
                shift 2
                ;;
            -r|--rate)
                MESSAGE_RATE="$2"
                shift 2
                ;;
            -s|--size)
                MESSAGE_SIZE="$2"
                shift 2
                ;;
            -t|--topics)
                TOPIC_COUNT="$2"
                shift 2
                ;;
            --partitions)
                PARTITIONS_PER_TOPIC="$2"
                shift 2
                ;;
            --config)
                CONFIG_FILE="$2"
                shift 2
                ;;
            --monitoring)
                ENABLE_MONITORING=1
                shift
                ;;
            --wait-ready)
                WAIT_READY=1
                shift
                ;;
            -v|--verbose)
                VERBOSE=1
                shift
                ;;
            -h|--help)
                show_usage
                exit 0
                ;;
            -*)
                log_error "Unknown option: $1"
                show_usage
                exit 1
                ;;
            *)
                if [[ -z "${COMMAND:-}" ]]; then
                    COMMAND="$1"
                else
                    log_error "Multiple commands specified"
                    show_usage
                    exit 1
                fi
                shift
                ;;
        esac
    done
}

# Check if Docker and Docker Compose are available
check_dependencies() {
    if ! command -v docker &> /dev/null; then
        log_error "Docker is not installed or not in PATH"
        exit 1
    fi
    
    if ! command -v docker-compose &> /dev/null && ! docker compose version &> /dev/null; then
        log_error "Docker Compose is not installed or not in PATH"
        exit 1
    fi
    
    # Use docker compose if available, otherwise docker-compose
    if docker compose version &> /dev/null; then
        DOCKER_COMPOSE="docker compose"
    else
        DOCKER_COMPOSE="docker-compose"
    fi
}

# Wait for services to be ready
wait_for_services() {
    log_info "Waiting for services to be ready..."
    
    local timeout=300  # 5 minutes timeout
    local elapsed=0
    local check_interval=5
    
    while [[ $elapsed -lt $timeout ]]; do
        if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps --format table | grep -q "healthy"; then
            if check_service_health; then
                log_success "All services are ready!"
                return 0
            fi
        fi
        
        sleep $check_interval
        elapsed=$((elapsed + check_interval))
        log_info "Waiting... ($elapsed/${timeout}s)"
    done
    
    log_error "Services did not become ready within $timeout seconds"
    return 1
}

# Check health of critical services
check_service_health() {
    # Check Kafka Gateway
    if ! curl -s http://localhost:9093 >/dev/null 2>&1; then
        return 1
    fi
    
    # Check Schema Registry
    if ! curl -s http://localhost:8081/subjects >/dev/null 2>&1; then
        return 1
    fi
    
    return 0
}

# Start the load test infrastructure
start_services() {
    log_info "Starting SeaweedFS Kafka load test infrastructure..."
    
    # Set environment variables
    export TEST_MODE="$TEST_MODE"
    export TEST_DURATION="$TEST_DURATION"
    export PRODUCER_COUNT="$PRODUCER_COUNT"
    export CONSUMER_COUNT="$CONSUMER_COUNT"
    export MESSAGE_RATE="$MESSAGE_RATE"
    export MESSAGE_SIZE="$MESSAGE_SIZE"
    export TOPIC_COUNT="$TOPIC_COUNT"
    export PARTITIONS_PER_TOPIC="$PARTITIONS_PER_TOPIC"
    
    # Start core services
    $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" up -d \
        seaweedfs-master \
        seaweedfs-volume \
        seaweedfs-filer \
        seaweedfs-mq-broker \
        kafka-gateway \
        schema-registry
    
    # Start monitoring if enabled
    if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then
        log_info "Starting monitoring stack..."
        $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d
    fi
    
    # Wait for services to be ready if requested
    if [[ "${WAIT_READY:-0}" == "1" ]]; then
        wait_for_services
    fi
    
    log_success "Infrastructure started successfully"
}

# Run the load test
run_loadtest() {
    log_info "Starting Kafka client load test..."
    log_info "Mode: $TEST_MODE, Duration: $TEST_DURATION"
    log_info "Producers: $PRODUCER_COUNT, Consumers: $CONSUMER_COUNT"
    log_info "Message Rate: $MESSAGE_RATE msgs/sec, Size: $MESSAGE_SIZE bytes"
    
    # Run the load test
    $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest up --abort-on-container-exit kafka-client-loadtest
    
    # Show test results
    show_results
}

# Show test results
show_results() {
    log_info "Load test completed! Gathering results..."
    
    # Get final metrics from the load test container
    if $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps kafka-client-loadtest-runner &>/dev/null; then
        log_info "Final test statistics:"
        $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" exec -T kafka-client-loadtest-runner curl -s http://localhost:8080/stats || true
    fi
    
    # Show Prometheus metrics if monitoring is enabled
    if [[ "${ENABLE_MONITORING:-0}" == "1" ]]; then
        log_info "Monitoring dashboards available at:"
        log_info "  Prometheus: http://localhost:9090"
        log_info "  Grafana:    http://localhost:3000 (admin/admin)"
    fi
    
    # Show where results are stored
    if [[ -d "$PROJECT_DIR/test-results" ]]; then
        log_info "Test results saved to: $PROJECT_DIR/test-results/"
    fi
}

# Stop services
stop_services() {
    log_info "Stopping all services..."
    $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down
    log_success "Services stopped"
}

# Show service status
show_status() {
    log_info "Service status:"
    $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" ps
}

# Show logs
show_logs() {
    $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" logs -f "${1:-}"
}

# Clean up all resources
clean_all() {
    log_warning "This will remove all volumes, networks, and containers. Are you sure? (y/N)"
    read -r response
    if [[ "$response" =~ ^[Yy]$ ]]; then
        log_info "Cleaning up all resources..."
        $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile loadtest --profile monitoring down -v --remove-orphans
        
        # Remove any remaining volumes
        docker volume ls -q | grep -E "(kafka-client-loadtest|seaweedfs)" | xargs -r docker volume rm
        
        # Remove networks
        docker network ls -q | grep -E "kafka-client-loadtest" | xargs -r docker network rm
        
        log_success "Cleanup completed"
    else
        log_info "Cleanup cancelled"
    fi
}

# Run predefined scenarios
run_scenario() {
    local scenario="$1"
    
    case "$scenario" in
        quick)
            TEST_MODE="comprehensive"
            TEST_DURATION="1m"
            PRODUCER_COUNT=2
            CONSUMER_COUNT=2
            MESSAGE_RATE=100
            MESSAGE_SIZE=512
            TOPIC_COUNT=2
            ;;
        standard)
            TEST_MODE="comprehensive"
            TEST_DURATION="5m"
            PRODUCER_COUNT=5
            CONSUMER_COUNT=3
            MESSAGE_RATE=500
            MESSAGE_SIZE=1024
            TOPIC_COUNT=3
            ;;
        stress)
            TEST_MODE="comprehensive"
            TEST_DURATION="10m"
            PRODUCER_COUNT=20
            CONSUMER_COUNT=10
            MESSAGE_RATE=2000
            MESSAGE_SIZE=2048
            TOPIC_COUNT=10
            ;;
        endurance)
            TEST_MODE="comprehensive"
            TEST_DURATION="30m"
            PRODUCER_COUNT=10
            CONSUMER_COUNT=5
            MESSAGE_RATE=1000
            MESSAGE_SIZE=1024
            TOPIC_COUNT=5
            ;;
        burst)
            TEST_MODE="comprehensive"
            TEST_DURATION="10m"
            PRODUCER_COUNT=10
            CONSUMER_COUNT=5
            MESSAGE_RATE=1000
            MESSAGE_SIZE=1024
            TOPIC_COUNT=5
            # Note: Burst behavior would be configured in the load test config
            ;;
        *)
            log_error "Unknown scenario: $scenario"
            log_info "Available scenarios: quick, standard, stress, endurance, burst"
            exit 1
            ;;
    esac
    
    log_info "Running $scenario scenario..."
    start_services
    if [[ "${WAIT_READY:-0}" == "1" ]]; then
        wait_for_services
    fi
    run_loadtest
}

# Main execution
main() {
    if [[ $# -eq 0 ]]; then
        show_usage
        exit 0
    fi
    
    parse_args "$@"
    check_dependencies
    
    case "${COMMAND:-}" in
        start)
            start_services
            run_loadtest
            ;;
        stop)
            stop_services
            ;;
        restart)
            stop_services
            start_services
            ;;
        status)
            show_status
            ;;
        logs)
            show_logs
            ;;
        clean)
            clean_all
            ;;
        monitor)
            ENABLE_MONITORING=1
            $DOCKER_COMPOSE -f "$DOCKER_COMPOSE_FILE" --profile monitoring up -d
            log_success "Monitoring stack started"
            log_info "Prometheus: http://localhost:9090"
            log_info "Grafana:    http://localhost:3000 (admin/admin)"
            ;;
        scenarios)
            if [[ -n "${2:-}" ]]; then
                run_scenario "$2"
            else
                log_error "Please specify a scenario"
                log_info "Available scenarios: quick, standard, stress, endurance, burst"
                exit 1
            fi
            ;;
        *)
            log_error "Unknown command: ${COMMAND:-}"
            show_usage
            exit 1
            ;;
    esac
}

# Set default values
ENABLE_MONITORING=0
WAIT_READY=0
VERBOSE=0

# Run main function
main "$@"