aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/scripts/wait-for-services.sh
blob: d2560728bd1bd16cf515481e7b84bd84ccfe725a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#!/bin/bash

# Wait for SeaweedFS and Kafka Gateway services to be ready
# This script checks service health and waits until all services are operational

set -euo pipefail

# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
BLUE='\033[0;34m'
NC='\033[0m'

log_info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

log_success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

log_warning() {
    echo -e "${YELLOW}[WARNING]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# Configuration
TIMEOUT=${TIMEOUT:-300}  # 5 minutes default timeout
CHECK_INTERVAL=${CHECK_INTERVAL:-5}  # Check every 5 seconds
SEAWEEDFS_MASTER_URL=${SEAWEEDFS_MASTER_URL:-"http://localhost:9333"}
KAFKA_GATEWAY_URL=${KAFKA_GATEWAY_URL:-"localhost:9093"}
SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL:-"http://localhost:8081"}
SEAWEEDFS_FILER_URL=${SEAWEEDFS_FILER_URL:-"http://localhost:8888"}

# Check if a service is reachable
check_http_service() {
    local url=$1
    local name=$2
    
    if curl -sf "$url" >/dev/null 2>&1; then
        return 0
    else
        return 1
    fi
}

# Check TCP port
check_tcp_service() {
    local host=$1
    local port=$2
    local name=$3
    
    if timeout 3 bash -c "</dev/tcp/$host/$port" 2>/dev/null; then
        return 0
    else
        return 1
    fi
}

# Check SeaweedFS Master
check_seaweedfs_master() {
    if check_http_service "$SEAWEEDFS_MASTER_URL/cluster/status" "SeaweedFS Master"; then
        # Additional check: ensure cluster has volumes
        local status_json
        status_json=$(curl -s "$SEAWEEDFS_MASTER_URL/cluster/status" 2>/dev/null || echo "{}")
        
        # Check if we have at least one volume server
        if echo "$status_json" | grep -q '"Max":0'; then
            log_warning "SeaweedFS Master is running but no volumes are available"
            return 1
        fi
        
        return 0
    fi
    return 1
}

# Check SeaweedFS Filer
check_seaweedfs_filer() {
    check_http_service "$SEAWEEDFS_FILER_URL/" "SeaweedFS Filer"
}

# Check Kafka Gateway
check_kafka_gateway() {
    local host="localhost"
    local port="9093"
    check_tcp_service "$host" "$port" "Kafka Gateway"
}

# Check Schema Registry
check_schema_registry() {
    # Check if Schema Registry container is running first
    if ! docker compose ps schema-registry | grep -q "Up"; then
        # Schema Registry is not running, which is okay for basic tests
        return 0
    fi
    
    # FIXED: Wait for Docker healthcheck to report "healthy", not just "Up"
    # Schema Registry has a 30s start_period, so we need to wait for the actual healthcheck
    local health_status
    health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "none")
    
    # If container has no healthcheck or healthcheck is not yet healthy, check HTTP directly
    if [[ "$health_status" == "healthy" ]]; then
        # Container reports healthy, do a final verification
        if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then
            return 0
        fi
    elif [[ "$health_status" == "starting" ]]; then
        # Still in startup period, wait longer
        return 1
    elif [[ "$health_status" == "none" ]]; then
        # No healthcheck defined (shouldn't happen), fall back to HTTP check
        if check_http_service "$SCHEMA_REGISTRY_URL/subjects" "Schema Registry"; then
            local subjects
            subjects=$(curl -s "$SCHEMA_REGISTRY_URL/subjects" 2>/dev/null || echo "[]")
            
            # Schema registry should at least return an empty array
            if [[ "$subjects" == "[]" ]]; then
                return 0
            elif echo "$subjects" | grep -q '\['; then
                return 0
            else
                log_warning "Schema Registry is not properly connected"
                return 1
            fi
        fi
    fi
    return 1
}

# Check MQ Broker
check_mq_broker() {
    check_tcp_service "localhost" "17777" "SeaweedFS MQ Broker"
}

# Main health check function
check_all_services() {
    local all_healthy=true
    
    log_info "Checking service health..."
    
    # Check SeaweedFS Master
    if check_seaweedfs_master; then
        log_success "✓ SeaweedFS Master is healthy"
    else
        log_error "✗ SeaweedFS Master is not ready"
        all_healthy=false
    fi
    
    # Check SeaweedFS Filer
    if check_seaweedfs_filer; then
        log_success "✓ SeaweedFS Filer is healthy"
    else
        log_error "✗ SeaweedFS Filer is not ready"
        all_healthy=false
    fi
    
    # Check MQ Broker
    if check_mq_broker; then
        log_success "✓ SeaweedFS MQ Broker is healthy"
    else
        log_error "✗ SeaweedFS MQ Broker is not ready"
        all_healthy=false
    fi
    
    # Check Kafka Gateway
    if check_kafka_gateway; then
        log_success "✓ Kafka Gateway is healthy"
    else
        log_error "✗ Kafka Gateway is not ready"
        all_healthy=false
    fi
    
    # Check Schema Registry
    if ! docker compose ps schema-registry | grep -q "Up"; then
        log_warning "⚠ Schema Registry is stopped (skipping)"
    elif check_schema_registry; then
        log_success "✓ Schema Registry is healthy"
    else
        # Check if it's still starting up (healthcheck start_period)
        local health_status
        health_status=$(docker inspect loadtest-schema-registry --format='{{.State.Health.Status}}' 2>/dev/null || echo "unknown")
        if [[ "$health_status" == "starting" ]]; then
            log_warning "⏳ Schema Registry is starting (waiting for healthcheck...)"
        else
            log_error "✗ Schema Registry is not ready (status: $health_status)"
        fi
        all_healthy=false
    fi
    
    $all_healthy
}

# Wait for all services to be ready
wait_for_services() {
    log_info "Waiting for all services to be ready (timeout: ${TIMEOUT}s)..."
    
    local elapsed=0
    
    while [[ $elapsed -lt $TIMEOUT ]]; do
        if check_all_services; then
            log_success "All services are ready! (took ${elapsed}s)"
            return 0
        fi
        
        log_info "Some services are not ready yet. Waiting ${CHECK_INTERVAL}s... (${elapsed}/${TIMEOUT}s)"
        sleep $CHECK_INTERVAL
        elapsed=$((elapsed + CHECK_INTERVAL))
    done
    
    log_error "Services did not become ready within ${TIMEOUT} seconds"
    log_error "Final service status:"
    check_all_services
    
    # Always dump Schema Registry diagnostics on timeout since it's the problematic service
    log_error "==========================================="
    log_error "Schema Registry Container Status:"
    log_error "==========================================="
    docker compose ps schema-registry 2>&1 || echo "Failed to get container status"
    docker inspect loadtest-schema-registry --format='Health: {{.State.Health.Status}} ({{len .State.Health.Log}} checks)' 2>&1 || echo "Failed to inspect container"
    log_error "==========================================="
    
    log_error "Network Connectivity Check:"
    log_error "==========================================="
    log_error "Can Schema Registry reach Kafka Gateway?"
    docker compose exec -T schema-registry ping -c 3 kafka-gateway 2>&1 || echo "Ping failed"
    docker compose exec -T schema-registry nc -zv kafka-gateway 9093 2>&1 || echo "Port 9093 unreachable"
    log_error "==========================================="
    
    log_error "Schema Registry Logs (last 100 lines):"
    log_error "==========================================="
    docker compose logs --tail=100 schema-registry 2>&1 || echo "Failed to get Schema Registry logs"
    log_error "==========================================="
    
    log_error "Kafka Gateway Logs (last 50 lines with 'SR' prefix):"
    log_error "==========================================="
    docker compose logs --tail=200 kafka-gateway 2>&1 | grep -i "SR" | tail -50 || echo "No SR-related logs found in Kafka Gateway"
    log_error "==========================================="
    
    log_error "MQ Broker Logs (last 30 lines):"
    log_error "==========================================="
    docker compose logs --tail=30 seaweedfs-mq-broker 2>&1 || echo "Failed to get MQ Broker logs"
    log_error "==========================================="
    
    return 1
}

# Show current service status
show_status() {
    log_info "Current service status:"
    check_all_services
}

# Main function
main() {
    case "${1:-wait}" in
        "wait")
            wait_for_services
            ;;
        "check")
            show_status
            ;;
        "status")
            show_status
            ;;
        *)
            echo "Usage: $0 [wait|check|status]"
            echo ""
            echo "Commands:"
            echo "  wait   - Wait for all services to be ready (default)"
            echo "  check  - Check current service status"
            echo "  status - Same as check"
            echo ""
            echo "Environment variables:"
            echo "  TIMEOUT - Maximum time to wait in seconds (default: 300)"
            echo "  CHECK_INTERVAL - Check interval in seconds (default: 5)"
            echo "  SEAWEEDFS_MASTER_URL - Master URL (default: http://localhost:9333)"
            echo "  KAFKA_GATEWAY_URL - Gateway URL (default: localhost:9093)"
            echo "  SCHEMA_REGISTRY_URL - Schema Registry URL (default: http://localhost:8081)"
            echo "  SEAWEEDFS_FILER_URL - Filer URL (default: http://localhost:8888)"
            exit 1
            ;;
    esac
}

main "$@"