diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-06-28 14:11:55 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-28 14:11:55 -0700 |
| commit | a1aab8a083111dd9357c7f35397fdf610f23cb6d (patch) | |
| tree | 66b56b09bec60cd1962236b7aad43a37011450b9 /telemetry/server | |
| parent | 29892c43ff95ad957c0f64ad5cd11e0d43e616e2 (diff) | |
| download | seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.tar.xz seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.zip | |
add telemetry (#6926)
* add telemetry
* fix go mod
* add default telemetry server url
* Update README.md
* replace with broker count instead of s3 count
* Update telemetry.pb.go
* github action to deploy
Diffstat (limited to 'telemetry/server')
| -rw-r--r-- | telemetry/server/Dockerfile | 18 | ||||
| -rw-r--r-- | telemetry/server/Makefile | 97 | ||||
| -rw-r--r-- | telemetry/server/api/handlers.go | 152 | ||||
| -rw-r--r-- | telemetry/server/dashboard/dashboard.go | 278 | ||||
| -rw-r--r-- | telemetry/server/go.sum | 31 | ||||
| -rw-r--r-- | telemetry/server/main.go | 111 | ||||
| -rw-r--r-- | telemetry/server/storage/prometheus.go | 245 |
7 files changed, 932 insertions, 0 deletions
diff --git a/telemetry/server/Dockerfile b/telemetry/server/Dockerfile new file mode 100644 index 000000000..8f3782fcf --- /dev/null +++ b/telemetry/server/Dockerfile @@ -0,0 +1,18 @@ +FROM golang:1.21-alpine AS builder + +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o telemetry-server . + +FROM alpine:latest +RUN apk --no-cache add ca-certificates +WORKDIR /root/ + +COPY --from=builder /app/telemetry-server . + +EXPOSE 8080 + +CMD ["./telemetry-server"]
\ No newline at end of file diff --git a/telemetry/server/Makefile b/telemetry/server/Makefile new file mode 100644 index 000000000..cf57f1777 --- /dev/null +++ b/telemetry/server/Makefile @@ -0,0 +1,97 @@ +.PHONY: build run clean test deps proto integration-test test-all + +# Build the telemetry server +build: + go build -o telemetry-server . + +# Run the server in development mode +run: + go run . -port=8080 -dashboard=true -cleanup=1h -max-age=24h + +# Run the server in production mode +run-prod: + ./telemetry-server -port=8080 -dashboard=true -cleanup=24h -max-age=720h + +# Clean build artifacts +clean: + rm -f telemetry-server + rm -f ../test/telemetry-server-test.log + go clean + +# Run unit tests +test: + go test ./... + +# Run integration tests +integration-test: + @echo "๐งช Running telemetry integration tests..." + cd ../../ && go run telemetry/test/integration.go + +# Run all tests (unit + integration) +test-all: test integration-test + +# Install dependencies +deps: + go mod download + go mod tidy + +# Generate protobuf code (requires protoc) +proto: + cd .. && protoc --go_out=. --go_opt=paths=source_relative proto/telemetry.proto + +# Build Docker image +docker-build: + docker build -t seaweedfs-telemetry . + +# Run with Docker +docker-run: + docker run -p 8080:8080 seaweedfs-telemetry -port=8080 -dashboard=true + +# Development with auto-reload (requires air: go install github.com/cosmtrek/air@latest) +dev: + air + +# Check if protoc is available +check-protoc: + @which protoc > /dev/null || (echo "protoc is required for proto generation. Install from https://grpc.io/docs/protoc-installation/" && exit 1) + +# Full development setup +setup: check-protoc deps proto build + +# Run a quick smoke test +smoke-test: build + @echo "๐ฅ Running smoke test..." + @timeout 10s ./telemetry-server -port=18081 > /dev/null 2>&1 & \ + SERVER_PID=$$!; \ + sleep 2; \ + if curl -s http://localhost:18081/health > /dev/null; then \ + echo "โ
Smoke test passed - server responds to health check"; \ + else \ + echo "โ Smoke test failed - server not responding"; \ + exit 1; \ + fi; \ + kill $$SERVER_PID 2>/dev/null || true + +# Continuous integration target +ci: deps proto build test integration-test + @echo "๐ All CI tests passed!" + +# Help +help: + @echo "Available targets:" + @echo " build - Build the telemetry server binary" + @echo " run - Run server in development mode" + @echo " run-prod - Run server in production mode" + @echo " clean - Clean build artifacts" + @echo " test - Run unit tests" + @echo " integration-test- Run integration tests" + @echo " test-all - Run all tests (unit + integration)" + @echo " deps - Install Go dependencies" + @echo " proto - Generate protobuf code" + @echo " docker-build - Build Docker image" + @echo " docker-run - Run with Docker" + @echo " dev - Run with auto-reload (requires air)" + @echo " smoke-test - Quick server health check" + @echo " setup - Full development setup" + @echo " ci - Continuous integration (all tests)" + @echo " help - Show this help"
\ No newline at end of file diff --git a/telemetry/server/api/handlers.go b/telemetry/server/api/handlers.go new file mode 100644 index 000000000..0ff00330b --- /dev/null +++ b/telemetry/server/api/handlers.go @@ -0,0 +1,152 @@ +package api + +import ( + "encoding/json" + "io" + "net/http" + "strconv" + "time" + + "github.com/seaweedfs/seaweedfs/telemetry/proto" + "github.com/seaweedfs/seaweedfs/telemetry/server/storage" + protobuf "google.golang.org/protobuf/proto" +) + +type Handler struct { + storage *storage.PrometheusStorage +} + +func NewHandler(storage *storage.PrometheusStorage) *Handler { + return &Handler{storage: storage} +} + +func (h *Handler) CollectTelemetry(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + contentType := r.Header.Get("Content-Type") + + // Only accept protobuf content type + if contentType != "application/x-protobuf" && contentType != "application/protobuf" { + http.Error(w, "Content-Type must be application/x-protobuf", http.StatusUnsupportedMediaType) + return + } + + // Read protobuf request + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + + req := &proto.TelemetryRequest{} + if err := protobuf.Unmarshal(body, req); err != nil { + http.Error(w, "Invalid protobuf data", http.StatusBadRequest) + return + } + + data := req.Data + if data == nil { + http.Error(w, "Missing telemetry data", http.StatusBadRequest) + return + } + + // Validate required fields + if data.ClusterId == "" || data.Version == "" || data.Os == "" { + http.Error(w, "Missing required fields", http.StatusBadRequest) + return + } + + // Set timestamp if not provided + if data.Timestamp == 0 { + data.Timestamp = time.Now().Unix() + } + + // Store the telemetry data + if err := h.storage.StoreTelemetry(data); err != nil { + http.Error(w, "Failed to store data", http.StatusInternalServerError) + return + } + + // Return protobuf response + resp := &proto.TelemetryResponse{ + Success: true, + Message: "Telemetry data received", + } + + respData, err := protobuf.Marshal(resp) + if err != nil { + http.Error(w, "Failed to marshal response", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(http.StatusOK) + w.Write(respData) +} + +func (h *Handler) GetStats(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + stats, err := h.storage.GetStats() + if err != nil { + http.Error(w, "Failed to get stats", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +func (h *Handler) GetInstances(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + limitStr := r.URL.Query().Get("limit") + limit := 100 // default + if limitStr != "" { + if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { + limit = l + } + } + + instances, err := h.storage.GetInstances(limit) + if err != nil { + http.Error(w, "Failed to get instances", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(instances) +} + +func (h *Handler) GetMetrics(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + daysStr := r.URL.Query().Get("days") + days := 30 // default + if daysStr != "" { + if d, err := strconv.Atoi(daysStr); err == nil && d > 0 && d <= 365 { + days = d + } + } + + metrics, err := h.storage.GetMetrics(days) + if err != nil { + http.Error(w, "Failed to get metrics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(metrics) +} diff --git a/telemetry/server/dashboard/dashboard.go b/telemetry/server/dashboard/dashboard.go new file mode 100644 index 000000000..9a56c7f1b --- /dev/null +++ b/telemetry/server/dashboard/dashboard.go @@ -0,0 +1,278 @@ +package dashboard + +import ( + "net/http" +) + +type Handler struct{} + +func NewHandler() *Handler { + return &Handler{} +} + +func (h *Handler) ServeIndex(w http.ResponseWriter, r *http.Request) { + html := `<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>SeaweedFS Telemetry Dashboard</title> + <script src="https://cdn.jsdelivr.net/npm/chart.js"></script> + <style> + body { + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; + margin: 0; + padding: 20px; + background-color: #f5f5f5; + } + .container { + max-width: 1200px; + margin: 0 auto; + } + .header { + background: white; + padding: 20px; + border-radius: 8px; + margin-bottom: 20px; + box-shadow: 0 2px 4px rgba(0,0,0,0.1); + } + .stats-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); + gap: 20px; + margin-bottom: 20px; + } + .stat-card { + background: white; + padding: 20px; + border-radius: 8px; + box-shadow: 0 2px 4px rgba(0,0,0,0.1); + } + .stat-value { + font-size: 2em; + font-weight: bold; + color: #2196F3; + } + .stat-label { + color: #666; + margin-top: 5px; + } + .chart-container { + background: white; + padding: 20px; + border-radius: 8px; + margin-bottom: 20px; + box-shadow: 0 2px 4px rgba(0,0,0,0.1); + } + .chart-title { + font-size: 1.2em; + font-weight: bold; + margin-bottom: 15px; + } + .loading { + text-align: center; + padding: 40px; + color: #666; + } + .error { + background: #ffebee; + color: #c62828; + padding: 15px; + border-radius: 4px; + margin: 10px 0; + } + </style> +</head> +<body> + <div class="container"> + <div class="header"> + <h1>SeaweedFS Telemetry Dashboard</h1> + <p>Privacy-respecting usage analytics for SeaweedFS</p> + </div> + + <div id="loading" class="loading">Loading telemetry data...</div> + <div id="error" class="error" style="display: none;"></div> + + <div id="dashboard" style="display: none;"> + <div class="stats-grid"> + <div class="stat-card"> + <div class="stat-value" id="totalInstances">-</div> + <div class="stat-label">Total Instances (30 days)</div> + </div> + <div class="stat-card"> + <div class="stat-value" id="activeInstances">-</div> + <div class="stat-label">Active Instances (7 days)</div> + </div> + <div class="stat-card"> + <div class="stat-value" id="totalVersions">-</div> + <div class="stat-label">Different Versions</div> + </div> + <div class="stat-card"> + <div class="stat-value" id="totalOS">-</div> + <div class="stat-label">Operating Systems</div> + </div> + </div> + + <div class="chart-container"> + <div class="chart-title">Version Distribution</div> + <canvas id="versionChart" width="400" height="200"></canvas> + </div> + + <div class="chart-container"> + <div class="chart-title">Operating System Distribution</div> + <canvas id="osChart" width="400" height="200"></canvas> + </div> + + <div class="chart-container"> + <div class="chart-title">Deployment Types</div> + <canvas id="deploymentChart" width="400" height="200"></canvas> + </div> + + <div class="chart-container"> + <div class="chart-title">Volume Servers Over Time</div> + <canvas id="serverChart" width="400" height="200"></canvas> + </div> + + <div class="chart-container"> + <div class="chart-title">Total Disk Usage Over Time</div> + <canvas id="diskChart" width="400" height="200"></canvas> + </div> + </div> + </div> + + <script> + let charts = {}; + + async function loadDashboard() { + try { + // Load stats + const statsResponse = await fetch('/api/stats'); + const stats = await statsResponse.json(); + + // Load metrics + const metricsResponse = await fetch('/api/metrics?days=30'); + const metrics = await metricsResponse.json(); + + updateStats(stats); + updateCharts(stats, metrics); + + document.getElementById('loading').style.display = 'none'; + document.getElementById('dashboard').style.display = 'block'; + } catch (error) { + console.error('Error loading dashboard:', error); + showError('Failed to load telemetry data: ' + error.message); + } + } + + function updateStats(stats) { + document.getElementById('totalInstances').textContent = stats.total_instances || 0; + document.getElementById('activeInstances').textContent = stats.active_instances || 0; + document.getElementById('totalVersions').textContent = Object.keys(stats.versions || {}).length; + document.getElementById('totalOS').textContent = Object.keys(stats.os_distribution || {}).length; + } + + function updateCharts(stats, metrics) { + // Version chart + createPieChart('versionChart', 'Version Distribution', stats.versions || {}); + + // OS chart + createPieChart('osChart', 'Operating System Distribution', stats.os_distribution || {}); + + // Deployment chart + createPieChart('deploymentChart', 'Deployment Types', stats.deployments || {}); + + // Server count over time + if (metrics.dates && metrics.server_counts) { + createLineChart('serverChart', 'Volume Servers', metrics.dates, metrics.server_counts, '#2196F3'); + } + + // Disk usage over time + if (metrics.dates && metrics.disk_usage) { + const diskUsageGB = metrics.disk_usage.map(bytes => Math.round(bytes / (1024 * 1024 * 1024))); + createLineChart('diskChart', 'Disk Usage (GB)', metrics.dates, diskUsageGB, '#4CAF50'); + } + } + + function createPieChart(canvasId, title, data) { + const ctx = document.getElementById(canvasId).getContext('2d'); + + if (charts[canvasId]) { + charts[canvasId].destroy(); + } + + const labels = Object.keys(data); + const values = Object.values(data); + + charts[canvasId] = new Chart(ctx, { + type: 'pie', + data: { + labels: labels, + datasets: [{ + data: values, + backgroundColor: [ + '#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', + '#9966FF', '#FF9F40', '#FF6384', '#C9CBCF' + ] + }] + }, + options: { + responsive: true, + plugins: { + legend: { + position: 'bottom' + } + } + } + }); + } + + function createLineChart(canvasId, label, labels, data, color) { + const ctx = document.getElementById(canvasId).getContext('2d'); + + if (charts[canvasId]) { + charts[canvasId].destroy(); + } + + charts[canvasId] = new Chart(ctx, { + type: 'line', + data: { + labels: labels, + datasets: [{ + label: label, + data: data, + borderColor: color, + backgroundColor: color + '20', + fill: true, + tension: 0.1 + }] + }, + options: { + responsive: true, + scales: { + y: { + beginAtZero: true + } + } + } + }); + } + + function showError(message) { + document.getElementById('loading').style.display = 'none'; + document.getElementById('error').style.display = 'block'; + document.getElementById('error').textContent = message; + } + + // Load dashboard on page load + loadDashboard(); + + // Refresh every 5 minutes + setInterval(loadDashboard, 5 * 60 * 1000); + </script> +</body> +</html>` + + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(http.StatusOK) + w.Write([]byte(html)) +} diff --git a/telemetry/server/go.sum b/telemetry/server/go.sum new file mode 100644 index 000000000..0aec189da --- /dev/null +++ b/telemetry/server/go.sum @@ -0,0 +1,31 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/telemetry/server/main.go b/telemetry/server/main.go new file mode 100644 index 000000000..6cbae05c7 --- /dev/null +++ b/telemetry/server/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/seaweedfs/seaweedfs/telemetry/server/api" + "github.com/seaweedfs/seaweedfs/telemetry/server/dashboard" + "github.com/seaweedfs/seaweedfs/telemetry/server/storage" +) + +var ( + port = flag.Int("port", 8080, "HTTP server port") + enableCORS = flag.Bool("cors", true, "Enable CORS for dashboard") + logRequests = flag.Bool("log", true, "Log incoming requests") + enableDashboard = flag.Bool("dashboard", true, "Enable built-in dashboard (optional when using Grafana)") + cleanupInterval = flag.Duration("cleanup", 24*time.Hour, "Cleanup interval for old instances") + maxInstanceAge = flag.Duration("max-age", 30*24*time.Hour, "Maximum age for instances before cleanup") +) + +func main() { + flag.Parse() + + // Create Prometheus storage instance + store := storage.NewPrometheusStorage() + + // Start cleanup routine + go func() { + ticker := time.NewTicker(*cleanupInterval) + defer ticker.Stop() + for range ticker.C { + store.CleanupOldInstances(*maxInstanceAge) + } + }() + + // Setup HTTP handlers + mux := http.NewServeMux() + + // Prometheus metrics endpoint + mux.Handle("/metrics", promhttp.Handler()) + + // API endpoints + apiHandler := api.NewHandler(store) + mux.HandleFunc("/api/collect", corsMiddleware(logMiddleware(apiHandler.CollectTelemetry))) + mux.HandleFunc("/api/stats", corsMiddleware(logMiddleware(apiHandler.GetStats))) + mux.HandleFunc("/api/instances", corsMiddleware(logMiddleware(apiHandler.GetInstances))) + mux.HandleFunc("/api/metrics", corsMiddleware(logMiddleware(apiHandler.GetMetrics))) + + // Dashboard (optional) + if *enableDashboard { + dashboardHandler := dashboard.NewHandler() + mux.HandleFunc("/", corsMiddleware(dashboardHandler.ServeIndex)) + mux.HandleFunc("/dashboard", corsMiddleware(dashboardHandler.ServeIndex)) + mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("./static")))) + } + + // Health check + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "status": "ok", + "time": time.Now().UTC().Format(time.RFC3339), + }) + }) + + addr := fmt.Sprintf(":%d", *port) + log.Printf("Starting telemetry server on %s", addr) + log.Printf("Prometheus metrics: http://localhost%s/metrics", addr) + if *enableDashboard { + log.Printf("Dashboard: http://localhost%s/dashboard", addr) + } + log.Printf("Cleanup interval: %v, Max instance age: %v", *cleanupInterval, *maxInstanceAge) + + if err := http.ListenAndServe(addr, mux); err != nil { + log.Fatalf("Server failed: %v", err) + } +} + +func corsMiddleware(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if *enableCORS { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") + } + + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + + next(w, r) + } +} + +func logMiddleware(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if *logRequests { + start := time.Now() + next(w, r) + log.Printf("%s %s %s %v", r.Method, r.URL.Path, r.RemoteAddr, time.Since(start)) + } else { + next(w, r) + } + } +} diff --git a/telemetry/server/storage/prometheus.go b/telemetry/server/storage/prometheus.go new file mode 100644 index 000000000..d25dd669a --- /dev/null +++ b/telemetry/server/storage/prometheus.go @@ -0,0 +1,245 @@ +package storage + +import ( + "encoding/json" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/seaweedfs/seaweedfs/telemetry/proto" +) + +type PrometheusStorage struct { + // Prometheus metrics + totalClusters prometheus.Gauge + activeClusters prometheus.Gauge + volumeServerCount *prometheus.GaugeVec + totalDiskBytes *prometheus.GaugeVec + totalVolumeCount *prometheus.GaugeVec + filerCount *prometheus.GaugeVec + brokerCount *prometheus.GaugeVec + clusterInfo *prometheus.GaugeVec + telemetryReceived prometheus.Counter + + // In-memory storage for API endpoints (if needed) + mu sync.RWMutex + instances map[string]*telemetryData + stats map[string]interface{} +} + +// telemetryData is an internal struct that includes the received timestamp +type telemetryData struct { + *proto.TelemetryData + ReceivedAt time.Time `json:"received_at"` +} + +func NewPrometheusStorage() *PrometheusStorage { + return &PrometheusStorage{ + totalClusters: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_total_clusters", + Help: "Total number of unique SeaweedFS clusters (last 30 days)", + }), + activeClusters: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_active_clusters", + Help: "Number of active SeaweedFS clusters (last 7 days)", + }), + volumeServerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_volume_servers", + Help: "Number of volume servers per cluster", + }, []string{"cluster_id", "version", "os", "deployment"}), + totalDiskBytes: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_disk_bytes", + Help: "Total disk usage in bytes per cluster", + }, []string{"cluster_id", "version", "os", "deployment"}), + totalVolumeCount: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_volume_count", + Help: "Total number of volumes per cluster", + }, []string{"cluster_id", "version", "os", "deployment"}), + filerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_filer_count", + Help: "Number of filer servers per cluster", + }, []string{"cluster_id", "version", "os", "deployment"}), + brokerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_broker_count", + Help: "Number of broker servers per cluster", + }, []string{"cluster_id", "version", "os", "deployment"}), + clusterInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "seaweedfs_telemetry_cluster_info", + Help: "Cluster information (always 1, labels contain metadata)", + }, []string{"cluster_id", "version", "os", "deployment", "features"}), + telemetryReceived: promauto.NewCounter(prometheus.CounterOpts{ + Name: "seaweedfs_telemetry_reports_received_total", + Help: "Total number of telemetry reports received", + }), + instances: make(map[string]*telemetryData), + stats: make(map[string]interface{}), + } +} + +func (s *PrometheusStorage) StoreTelemetry(data *proto.TelemetryData) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Update Prometheus metrics + labels := prometheus.Labels{ + "cluster_id": data.ClusterId, + "version": data.Version, + "os": data.Os, + "deployment": data.Deployment, + } + + s.volumeServerCount.With(labels).Set(float64(data.VolumeServerCount)) + s.totalDiskBytes.With(labels).Set(float64(data.TotalDiskBytes)) + s.totalVolumeCount.With(labels).Set(float64(data.TotalVolumeCount)) + s.filerCount.With(labels).Set(float64(data.FilerCount)) + s.brokerCount.With(labels).Set(float64(data.BrokerCount)) + + // Features as JSON string for the label + featuresJSON, _ := json.Marshal(data.Features) + infoLabels := prometheus.Labels{ + "cluster_id": data.ClusterId, + "version": data.Version, + "os": data.Os, + "deployment": data.Deployment, + "features": string(featuresJSON), + } + s.clusterInfo.With(infoLabels).Set(1) + + s.telemetryReceived.Inc() + + // Store in memory for API endpoints + s.instances[data.ClusterId] = &telemetryData{ + TelemetryData: data, + ReceivedAt: time.Now().UTC(), + } + + // Update aggregated stats + s.updateStats() + + return nil +} + +func (s *PrometheusStorage) GetStats() (map[string]interface{}, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Return cached stats + result := make(map[string]interface{}) + for k, v := range s.stats { + result[k] = v + } + return result, nil +} + +func (s *PrometheusStorage) GetInstances(limit int) ([]*telemetryData, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var instances []*telemetryData + count := 0 + for _, instance := range s.instances { + if count >= limit { + break + } + instances = append(instances, instance) + count++ + } + + return instances, nil +} + +func (s *PrometheusStorage) GetMetrics(days int) (map[string]interface{}, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Return current metrics from in-memory storage + // Historical data should be queried from Prometheus directly + cutoff := time.Now().AddDate(0, 0, -days) + + var volumeServers []map[string]interface{} + var diskUsage []map[string]interface{} + + for _, instance := range s.instances { + if instance.ReceivedAt.After(cutoff) { + volumeServers = append(volumeServers, map[string]interface{}{ + "date": instance.ReceivedAt.Format("2006-01-02"), + "value": instance.TelemetryData.VolumeServerCount, + }) + diskUsage = append(diskUsage, map[string]interface{}{ + "date": instance.ReceivedAt.Format("2006-01-02"), + "value": instance.TelemetryData.TotalDiskBytes, + }) + } + } + + return map[string]interface{}{ + "volume_servers": volumeServers, + "disk_usage": diskUsage, + }, nil +} + +func (s *PrometheusStorage) updateStats() { + now := time.Now() + last7Days := now.AddDate(0, 0, -7) + last30Days := now.AddDate(0, 0, -30) + + totalInstances := 0 + activeInstances := 0 + versions := make(map[string]int) + osDistribution := make(map[string]int) + deployments := make(map[string]int) + + for _, instance := range s.instances { + if instance.ReceivedAt.After(last30Days) { + totalInstances++ + } + if instance.ReceivedAt.After(last7Days) { + activeInstances++ + versions[instance.TelemetryData.Version]++ + osDistribution[instance.TelemetryData.Os]++ + deployments[instance.TelemetryData.Deployment]++ + } + } + + // Update Prometheus gauges + s.totalClusters.Set(float64(totalInstances)) + s.activeClusters.Set(float64(activeInstances)) + + // Update cached stats for API + s.stats = map[string]interface{}{ + "total_instances": totalInstances, + "active_instances": activeInstances, + "versions": versions, + "os_distribution": osDistribution, + "deployments": deployments, + } +} + +// CleanupOldInstances removes instances older than the specified duration +func (s *PrometheusStorage) CleanupOldInstances(maxAge time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + + cutoff := time.Now().Add(-maxAge) + for instanceID, instance := range s.instances { + if instance.ReceivedAt.Before(cutoff) { + delete(s.instances, instanceID) + + // Remove from Prometheus metrics + labels := prometheus.Labels{ + "cluster_id": instance.TelemetryData.ClusterId, + "version": instance.TelemetryData.Version, + "os": instance.TelemetryData.Os, + "deployment": instance.TelemetryData.Deployment, + } + s.volumeServerCount.Delete(labels) + s.totalDiskBytes.Delete(labels) + s.totalVolumeCount.Delete(labels) + s.filerCount.Delete(labels) + s.brokerCount.Delete(labels) + } + } + + s.updateStats() +} |
