aboutsummaryrefslogtreecommitdiff
path: root/telemetry/server
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-06-28 14:11:55 -0700
committerGitHub <noreply@github.com>2025-06-28 14:11:55 -0700
commita1aab8a083111dd9357c7f35397fdf610f23cb6d (patch)
tree66b56b09bec60cd1962236b7aad43a37011450b9 /telemetry/server
parent29892c43ff95ad957c0f64ad5cd11e0d43e616e2 (diff)
downloadseaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.tar.xz
seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.zip
add telemetry (#6926)
* add telemetry * fix go mod * add default telemetry server url * Update README.md * replace with broker count instead of s3 count * Update telemetry.pb.go * github action to deploy
Diffstat (limited to 'telemetry/server')
-rw-r--r--telemetry/server/Dockerfile18
-rw-r--r--telemetry/server/Makefile97
-rw-r--r--telemetry/server/api/handlers.go152
-rw-r--r--telemetry/server/dashboard/dashboard.go278
-rw-r--r--telemetry/server/go.sum31
-rw-r--r--telemetry/server/main.go111
-rw-r--r--telemetry/server/storage/prometheus.go245
7 files changed, 932 insertions, 0 deletions
diff --git a/telemetry/server/Dockerfile b/telemetry/server/Dockerfile
new file mode 100644
index 000000000..8f3782fcf
--- /dev/null
+++ b/telemetry/server/Dockerfile
@@ -0,0 +1,18 @@
+FROM golang:1.21-alpine AS builder
+
+WORKDIR /app
+COPY go.mod go.sum ./
+RUN go mod download
+
+COPY . .
+RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o telemetry-server .
+
+FROM alpine:latest
+RUN apk --no-cache add ca-certificates
+WORKDIR /root/
+
+COPY --from=builder /app/telemetry-server .
+
+EXPOSE 8080
+
+CMD ["./telemetry-server"] \ No newline at end of file
diff --git a/telemetry/server/Makefile b/telemetry/server/Makefile
new file mode 100644
index 000000000..cf57f1777
--- /dev/null
+++ b/telemetry/server/Makefile
@@ -0,0 +1,97 @@
+.PHONY: build run clean test deps proto integration-test test-all
+
+# Build the telemetry server
+build:
+ go build -o telemetry-server .
+
+# Run the server in development mode
+run:
+ go run . -port=8080 -dashboard=true -cleanup=1h -max-age=24h
+
+# Run the server in production mode
+run-prod:
+ ./telemetry-server -port=8080 -dashboard=true -cleanup=24h -max-age=720h
+
+# Clean build artifacts
+clean:
+ rm -f telemetry-server
+ rm -f ../test/telemetry-server-test.log
+ go clean
+
+# Run unit tests
+test:
+ go test ./...
+
+# Run integration tests
+integration-test:
+ @echo "๐Ÿงช Running telemetry integration tests..."
+ cd ../../ && go run telemetry/test/integration.go
+
+# Run all tests (unit + integration)
+test-all: test integration-test
+
+# Install dependencies
+deps:
+ go mod download
+ go mod tidy
+
+# Generate protobuf code (requires protoc)
+proto:
+ cd .. && protoc --go_out=. --go_opt=paths=source_relative proto/telemetry.proto
+
+# Build Docker image
+docker-build:
+ docker build -t seaweedfs-telemetry .
+
+# Run with Docker
+docker-run:
+ docker run -p 8080:8080 seaweedfs-telemetry -port=8080 -dashboard=true
+
+# Development with auto-reload (requires air: go install github.com/cosmtrek/air@latest)
+dev:
+ air
+
+# Check if protoc is available
+check-protoc:
+ @which protoc > /dev/null || (echo "protoc is required for proto generation. Install from https://grpc.io/docs/protoc-installation/" && exit 1)
+
+# Full development setup
+setup: check-protoc deps proto build
+
+# Run a quick smoke test
+smoke-test: build
+ @echo "๐Ÿ”ฅ Running smoke test..."
+ @timeout 10s ./telemetry-server -port=18081 > /dev/null 2>&1 & \
+ SERVER_PID=$$!; \
+ sleep 2; \
+ if curl -s http://localhost:18081/health > /dev/null; then \
+ echo "โœ… Smoke test passed - server responds to health check"; \
+ else \
+ echo "โŒ Smoke test failed - server not responding"; \
+ exit 1; \
+ fi; \
+ kill $$SERVER_PID 2>/dev/null || true
+
+# Continuous integration target
+ci: deps proto build test integration-test
+ @echo "๐ŸŽ‰ All CI tests passed!"
+
+# Help
+help:
+ @echo "Available targets:"
+ @echo " build - Build the telemetry server binary"
+ @echo " run - Run server in development mode"
+ @echo " run-prod - Run server in production mode"
+ @echo " clean - Clean build artifacts"
+ @echo " test - Run unit tests"
+ @echo " integration-test- Run integration tests"
+ @echo " test-all - Run all tests (unit + integration)"
+ @echo " deps - Install Go dependencies"
+ @echo " proto - Generate protobuf code"
+ @echo " docker-build - Build Docker image"
+ @echo " docker-run - Run with Docker"
+ @echo " dev - Run with auto-reload (requires air)"
+ @echo " smoke-test - Quick server health check"
+ @echo " setup - Full development setup"
+ @echo " ci - Continuous integration (all tests)"
+ @echo " help - Show this help" \ No newline at end of file
diff --git a/telemetry/server/api/handlers.go b/telemetry/server/api/handlers.go
new file mode 100644
index 000000000..0ff00330b
--- /dev/null
+++ b/telemetry/server/api/handlers.go
@@ -0,0 +1,152 @@
+package api
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/telemetry/proto"
+ "github.com/seaweedfs/seaweedfs/telemetry/server/storage"
+ protobuf "google.golang.org/protobuf/proto"
+)
+
+type Handler struct {
+ storage *storage.PrometheusStorage
+}
+
+func NewHandler(storage *storage.PrometheusStorage) *Handler {
+ return &Handler{storage: storage}
+}
+
+func (h *Handler) CollectTelemetry(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ contentType := r.Header.Get("Content-Type")
+
+ // Only accept protobuf content type
+ if contentType != "application/x-protobuf" && contentType != "application/protobuf" {
+ http.Error(w, "Content-Type must be application/x-protobuf", http.StatusUnsupportedMediaType)
+ return
+ }
+
+ // Read protobuf request
+ body, err := io.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, "Failed to read request body", http.StatusBadRequest)
+ return
+ }
+
+ req := &proto.TelemetryRequest{}
+ if err := protobuf.Unmarshal(body, req); err != nil {
+ http.Error(w, "Invalid protobuf data", http.StatusBadRequest)
+ return
+ }
+
+ data := req.Data
+ if data == nil {
+ http.Error(w, "Missing telemetry data", http.StatusBadRequest)
+ return
+ }
+
+ // Validate required fields
+ if data.ClusterId == "" || data.Version == "" || data.Os == "" {
+ http.Error(w, "Missing required fields", http.StatusBadRequest)
+ return
+ }
+
+ // Set timestamp if not provided
+ if data.Timestamp == 0 {
+ data.Timestamp = time.Now().Unix()
+ }
+
+ // Store the telemetry data
+ if err := h.storage.StoreTelemetry(data); err != nil {
+ http.Error(w, "Failed to store data", http.StatusInternalServerError)
+ return
+ }
+
+ // Return protobuf response
+ resp := &proto.TelemetryResponse{
+ Success: true,
+ Message: "Telemetry data received",
+ }
+
+ respData, err := protobuf.Marshal(resp)
+ if err != nil {
+ http.Error(w, "Failed to marshal response", http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/x-protobuf")
+ w.WriteHeader(http.StatusOK)
+ w.Write(respData)
+}
+
+func (h *Handler) GetStats(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ stats, err := h.storage.GetStats()
+ if err != nil {
+ http.Error(w, "Failed to get stats", http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(stats)
+}
+
+func (h *Handler) GetInstances(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ limitStr := r.URL.Query().Get("limit")
+ limit := 100 // default
+ if limitStr != "" {
+ if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 {
+ limit = l
+ }
+ }
+
+ instances, err := h.storage.GetInstances(limit)
+ if err != nil {
+ http.Error(w, "Failed to get instances", http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(instances)
+}
+
+func (h *Handler) GetMetrics(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ daysStr := r.URL.Query().Get("days")
+ days := 30 // default
+ if daysStr != "" {
+ if d, err := strconv.Atoi(daysStr); err == nil && d > 0 && d <= 365 {
+ days = d
+ }
+ }
+
+ metrics, err := h.storage.GetMetrics(days)
+ if err != nil {
+ http.Error(w, "Failed to get metrics", http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(metrics)
+}
diff --git a/telemetry/server/dashboard/dashboard.go b/telemetry/server/dashboard/dashboard.go
new file mode 100644
index 000000000..9a56c7f1b
--- /dev/null
+++ b/telemetry/server/dashboard/dashboard.go
@@ -0,0 +1,278 @@
+package dashboard
+
+import (
+ "net/http"
+)
+
+type Handler struct{}
+
+func NewHandler() *Handler {
+ return &Handler{}
+}
+
+func (h *Handler) ServeIndex(w http.ResponseWriter, r *http.Request) {
+ html := `<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="UTF-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <title>SeaweedFS Telemetry Dashboard</title>
+ <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
+ <style>
+ body {
+ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
+ margin: 0;
+ padding: 20px;
+ background-color: #f5f5f5;
+ }
+ .container {
+ max-width: 1200px;
+ margin: 0 auto;
+ }
+ .header {
+ background: white;
+ padding: 20px;
+ border-radius: 8px;
+ margin-bottom: 20px;
+ box-shadow: 0 2px 4px rgba(0,0,0,0.1);
+ }
+ .stats-grid {
+ display: grid;
+ grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
+ gap: 20px;
+ margin-bottom: 20px;
+ }
+ .stat-card {
+ background: white;
+ padding: 20px;
+ border-radius: 8px;
+ box-shadow: 0 2px 4px rgba(0,0,0,0.1);
+ }
+ .stat-value {
+ font-size: 2em;
+ font-weight: bold;
+ color: #2196F3;
+ }
+ .stat-label {
+ color: #666;
+ margin-top: 5px;
+ }
+ .chart-container {
+ background: white;
+ padding: 20px;
+ border-radius: 8px;
+ margin-bottom: 20px;
+ box-shadow: 0 2px 4px rgba(0,0,0,0.1);
+ }
+ .chart-title {
+ font-size: 1.2em;
+ font-weight: bold;
+ margin-bottom: 15px;
+ }
+ .loading {
+ text-align: center;
+ padding: 40px;
+ color: #666;
+ }
+ .error {
+ background: #ffebee;
+ color: #c62828;
+ padding: 15px;
+ border-radius: 4px;
+ margin: 10px 0;
+ }
+ </style>
+</head>
+<body>
+ <div class="container">
+ <div class="header">
+ <h1>SeaweedFS Telemetry Dashboard</h1>
+ <p>Privacy-respecting usage analytics for SeaweedFS</p>
+ </div>
+
+ <div id="loading" class="loading">Loading telemetry data...</div>
+ <div id="error" class="error" style="display: none;"></div>
+
+ <div id="dashboard" style="display: none;">
+ <div class="stats-grid">
+ <div class="stat-card">
+ <div class="stat-value" id="totalInstances">-</div>
+ <div class="stat-label">Total Instances (30 days)</div>
+ </div>
+ <div class="stat-card">
+ <div class="stat-value" id="activeInstances">-</div>
+ <div class="stat-label">Active Instances (7 days)</div>
+ </div>
+ <div class="stat-card">
+ <div class="stat-value" id="totalVersions">-</div>
+ <div class="stat-label">Different Versions</div>
+ </div>
+ <div class="stat-card">
+ <div class="stat-value" id="totalOS">-</div>
+ <div class="stat-label">Operating Systems</div>
+ </div>
+ </div>
+
+ <div class="chart-container">
+ <div class="chart-title">Version Distribution</div>
+ <canvas id="versionChart" width="400" height="200"></canvas>
+ </div>
+
+ <div class="chart-container">
+ <div class="chart-title">Operating System Distribution</div>
+ <canvas id="osChart" width="400" height="200"></canvas>
+ </div>
+
+ <div class="chart-container">
+ <div class="chart-title">Deployment Types</div>
+ <canvas id="deploymentChart" width="400" height="200"></canvas>
+ </div>
+
+ <div class="chart-container">
+ <div class="chart-title">Volume Servers Over Time</div>
+ <canvas id="serverChart" width="400" height="200"></canvas>
+ </div>
+
+ <div class="chart-container">
+ <div class="chart-title">Total Disk Usage Over Time</div>
+ <canvas id="diskChart" width="400" height="200"></canvas>
+ </div>
+ </div>
+ </div>
+
+ <script>
+ let charts = {};
+
+ async function loadDashboard() {
+ try {
+ // Load stats
+ const statsResponse = await fetch('/api/stats');
+ const stats = await statsResponse.json();
+
+ // Load metrics
+ const metricsResponse = await fetch('/api/metrics?days=30');
+ const metrics = await metricsResponse.json();
+
+ updateStats(stats);
+ updateCharts(stats, metrics);
+
+ document.getElementById('loading').style.display = 'none';
+ document.getElementById('dashboard').style.display = 'block';
+ } catch (error) {
+ console.error('Error loading dashboard:', error);
+ showError('Failed to load telemetry data: ' + error.message);
+ }
+ }
+
+ function updateStats(stats) {
+ document.getElementById('totalInstances').textContent = stats.total_instances || 0;
+ document.getElementById('activeInstances').textContent = stats.active_instances || 0;
+ document.getElementById('totalVersions').textContent = Object.keys(stats.versions || {}).length;
+ document.getElementById('totalOS').textContent = Object.keys(stats.os_distribution || {}).length;
+ }
+
+ function updateCharts(stats, metrics) {
+ // Version chart
+ createPieChart('versionChart', 'Version Distribution', stats.versions || {});
+
+ // OS chart
+ createPieChart('osChart', 'Operating System Distribution', stats.os_distribution || {});
+
+ // Deployment chart
+ createPieChart('deploymentChart', 'Deployment Types', stats.deployments || {});
+
+ // Server count over time
+ if (metrics.dates && metrics.server_counts) {
+ createLineChart('serverChart', 'Volume Servers', metrics.dates, metrics.server_counts, '#2196F3');
+ }
+
+ // Disk usage over time
+ if (metrics.dates && metrics.disk_usage) {
+ const diskUsageGB = metrics.disk_usage.map(bytes => Math.round(bytes / (1024 * 1024 * 1024)));
+ createLineChart('diskChart', 'Disk Usage (GB)', metrics.dates, diskUsageGB, '#4CAF50');
+ }
+ }
+
+ function createPieChart(canvasId, title, data) {
+ const ctx = document.getElementById(canvasId).getContext('2d');
+
+ if (charts[canvasId]) {
+ charts[canvasId].destroy();
+ }
+
+ const labels = Object.keys(data);
+ const values = Object.values(data);
+
+ charts[canvasId] = new Chart(ctx, {
+ type: 'pie',
+ data: {
+ labels: labels,
+ datasets: [{
+ data: values,
+ backgroundColor: [
+ '#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0',
+ '#9966FF', '#FF9F40', '#FF6384', '#C9CBCF'
+ ]
+ }]
+ },
+ options: {
+ responsive: true,
+ plugins: {
+ legend: {
+ position: 'bottom'
+ }
+ }
+ }
+ });
+ }
+
+ function createLineChart(canvasId, label, labels, data, color) {
+ const ctx = document.getElementById(canvasId).getContext('2d');
+
+ if (charts[canvasId]) {
+ charts[canvasId].destroy();
+ }
+
+ charts[canvasId] = new Chart(ctx, {
+ type: 'line',
+ data: {
+ labels: labels,
+ datasets: [{
+ label: label,
+ data: data,
+ borderColor: color,
+ backgroundColor: color + '20',
+ fill: true,
+ tension: 0.1
+ }]
+ },
+ options: {
+ responsive: true,
+ scales: {
+ y: {
+ beginAtZero: true
+ }
+ }
+ }
+ });
+ }
+
+ function showError(message) {
+ document.getElementById('loading').style.display = 'none';
+ document.getElementById('error').style.display = 'block';
+ document.getElementById('error').textContent = message;
+ }
+
+ // Load dashboard on page load
+ loadDashboard();
+
+ // Refresh every 5 minutes
+ setInterval(loadDashboard, 5 * 60 * 1000);
+ </script>
+</body>
+</html>`
+
+ w.Header().Set("Content-Type", "text/html")
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(html))
+}
diff --git a/telemetry/server/go.sum b/telemetry/server/go.sum
new file mode 100644
index 000000000..0aec189da
--- /dev/null
+++ b/telemetry/server/go.sum
@@ -0,0 +1,31 @@
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
+github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
+github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
+github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
+github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
+github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
+github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
+github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
+github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
+github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
+github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
+golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
+google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
diff --git a/telemetry/server/main.go b/telemetry/server/main.go
new file mode 100644
index 000000000..6cbae05c7
--- /dev/null
+++ b/telemetry/server/main.go
@@ -0,0 +1,111 @@
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/seaweedfs/seaweedfs/telemetry/server/api"
+ "github.com/seaweedfs/seaweedfs/telemetry/server/dashboard"
+ "github.com/seaweedfs/seaweedfs/telemetry/server/storage"
+)
+
+var (
+ port = flag.Int("port", 8080, "HTTP server port")
+ enableCORS = flag.Bool("cors", true, "Enable CORS for dashboard")
+ logRequests = flag.Bool("log", true, "Log incoming requests")
+ enableDashboard = flag.Bool("dashboard", true, "Enable built-in dashboard (optional when using Grafana)")
+ cleanupInterval = flag.Duration("cleanup", 24*time.Hour, "Cleanup interval for old instances")
+ maxInstanceAge = flag.Duration("max-age", 30*24*time.Hour, "Maximum age for instances before cleanup")
+)
+
+func main() {
+ flag.Parse()
+
+ // Create Prometheus storage instance
+ store := storage.NewPrometheusStorage()
+
+ // Start cleanup routine
+ go func() {
+ ticker := time.NewTicker(*cleanupInterval)
+ defer ticker.Stop()
+ for range ticker.C {
+ store.CleanupOldInstances(*maxInstanceAge)
+ }
+ }()
+
+ // Setup HTTP handlers
+ mux := http.NewServeMux()
+
+ // Prometheus metrics endpoint
+ mux.Handle("/metrics", promhttp.Handler())
+
+ // API endpoints
+ apiHandler := api.NewHandler(store)
+ mux.HandleFunc("/api/collect", corsMiddleware(logMiddleware(apiHandler.CollectTelemetry)))
+ mux.HandleFunc("/api/stats", corsMiddleware(logMiddleware(apiHandler.GetStats)))
+ mux.HandleFunc("/api/instances", corsMiddleware(logMiddleware(apiHandler.GetInstances)))
+ mux.HandleFunc("/api/metrics", corsMiddleware(logMiddleware(apiHandler.GetMetrics)))
+
+ // Dashboard (optional)
+ if *enableDashboard {
+ dashboardHandler := dashboard.NewHandler()
+ mux.HandleFunc("/", corsMiddleware(dashboardHandler.ServeIndex))
+ mux.HandleFunc("/dashboard", corsMiddleware(dashboardHandler.ServeIndex))
+ mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("./static"))))
+ }
+
+ // Health check
+ mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]string{
+ "status": "ok",
+ "time": time.Now().UTC().Format(time.RFC3339),
+ })
+ })
+
+ addr := fmt.Sprintf(":%d", *port)
+ log.Printf("Starting telemetry server on %s", addr)
+ log.Printf("Prometheus metrics: http://localhost%s/metrics", addr)
+ if *enableDashboard {
+ log.Printf("Dashboard: http://localhost%s/dashboard", addr)
+ }
+ log.Printf("Cleanup interval: %v, Max instance age: %v", *cleanupInterval, *maxInstanceAge)
+
+ if err := http.ListenAndServe(addr, mux); err != nil {
+ log.Fatalf("Server failed: %v", err)
+ }
+}
+
+func corsMiddleware(next http.HandlerFunc) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if *enableCORS {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
+ w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
+ }
+
+ if r.Method == "OPTIONS" {
+ w.WriteHeader(http.StatusOK)
+ return
+ }
+
+ next(w, r)
+ }
+}
+
+func logMiddleware(next http.HandlerFunc) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if *logRequests {
+ start := time.Now()
+ next(w, r)
+ log.Printf("%s %s %s %v", r.Method, r.URL.Path, r.RemoteAddr, time.Since(start))
+ } else {
+ next(w, r)
+ }
+ }
+}
diff --git a/telemetry/server/storage/prometheus.go b/telemetry/server/storage/prometheus.go
new file mode 100644
index 000000000..d25dd669a
--- /dev/null
+++ b/telemetry/server/storage/prometheus.go
@@ -0,0 +1,245 @@
+package storage
+
+import (
+ "encoding/json"
+ "sync"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/seaweedfs/seaweedfs/telemetry/proto"
+)
+
+type PrometheusStorage struct {
+ // Prometheus metrics
+ totalClusters prometheus.Gauge
+ activeClusters prometheus.Gauge
+ volumeServerCount *prometheus.GaugeVec
+ totalDiskBytes *prometheus.GaugeVec
+ totalVolumeCount *prometheus.GaugeVec
+ filerCount *prometheus.GaugeVec
+ brokerCount *prometheus.GaugeVec
+ clusterInfo *prometheus.GaugeVec
+ telemetryReceived prometheus.Counter
+
+ // In-memory storage for API endpoints (if needed)
+ mu sync.RWMutex
+ instances map[string]*telemetryData
+ stats map[string]interface{}
+}
+
+// telemetryData is an internal struct that includes the received timestamp
+type telemetryData struct {
+ *proto.TelemetryData
+ ReceivedAt time.Time `json:"received_at"`
+}
+
+func NewPrometheusStorage() *PrometheusStorage {
+ return &PrometheusStorage{
+ totalClusters: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_total_clusters",
+ Help: "Total number of unique SeaweedFS clusters (last 30 days)",
+ }),
+ activeClusters: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_active_clusters",
+ Help: "Number of active SeaweedFS clusters (last 7 days)",
+ }),
+ volumeServerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_volume_servers",
+ Help: "Number of volume servers per cluster",
+ }, []string{"cluster_id", "version", "os", "deployment"}),
+ totalDiskBytes: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_disk_bytes",
+ Help: "Total disk usage in bytes per cluster",
+ }, []string{"cluster_id", "version", "os", "deployment"}),
+ totalVolumeCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_volume_count",
+ Help: "Total number of volumes per cluster",
+ }, []string{"cluster_id", "version", "os", "deployment"}),
+ filerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_filer_count",
+ Help: "Number of filer servers per cluster",
+ }, []string{"cluster_id", "version", "os", "deployment"}),
+ brokerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_broker_count",
+ Help: "Number of broker servers per cluster",
+ }, []string{"cluster_id", "version", "os", "deployment"}),
+ clusterInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "seaweedfs_telemetry_cluster_info",
+ Help: "Cluster information (always 1, labels contain metadata)",
+ }, []string{"cluster_id", "version", "os", "deployment", "features"}),
+ telemetryReceived: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "seaweedfs_telemetry_reports_received_total",
+ Help: "Total number of telemetry reports received",
+ }),
+ instances: make(map[string]*telemetryData),
+ stats: make(map[string]interface{}),
+ }
+}
+
+func (s *PrometheusStorage) StoreTelemetry(data *proto.TelemetryData) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Update Prometheus metrics
+ labels := prometheus.Labels{
+ "cluster_id": data.ClusterId,
+ "version": data.Version,
+ "os": data.Os,
+ "deployment": data.Deployment,
+ }
+
+ s.volumeServerCount.With(labels).Set(float64(data.VolumeServerCount))
+ s.totalDiskBytes.With(labels).Set(float64(data.TotalDiskBytes))
+ s.totalVolumeCount.With(labels).Set(float64(data.TotalVolumeCount))
+ s.filerCount.With(labels).Set(float64(data.FilerCount))
+ s.brokerCount.With(labels).Set(float64(data.BrokerCount))
+
+ // Features as JSON string for the label
+ featuresJSON, _ := json.Marshal(data.Features)
+ infoLabels := prometheus.Labels{
+ "cluster_id": data.ClusterId,
+ "version": data.Version,
+ "os": data.Os,
+ "deployment": data.Deployment,
+ "features": string(featuresJSON),
+ }
+ s.clusterInfo.With(infoLabels).Set(1)
+
+ s.telemetryReceived.Inc()
+
+ // Store in memory for API endpoints
+ s.instances[data.ClusterId] = &telemetryData{
+ TelemetryData: data,
+ ReceivedAt: time.Now().UTC(),
+ }
+
+ // Update aggregated stats
+ s.updateStats()
+
+ return nil
+}
+
+func (s *PrometheusStorage) GetStats() (map[string]interface{}, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ // Return cached stats
+ result := make(map[string]interface{})
+ for k, v := range s.stats {
+ result[k] = v
+ }
+ return result, nil
+}
+
+func (s *PrometheusStorage) GetInstances(limit int) ([]*telemetryData, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ var instances []*telemetryData
+ count := 0
+ for _, instance := range s.instances {
+ if count >= limit {
+ break
+ }
+ instances = append(instances, instance)
+ count++
+ }
+
+ return instances, nil
+}
+
+func (s *PrometheusStorage) GetMetrics(days int) (map[string]interface{}, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ // Return current metrics from in-memory storage
+ // Historical data should be queried from Prometheus directly
+ cutoff := time.Now().AddDate(0, 0, -days)
+
+ var volumeServers []map[string]interface{}
+ var diskUsage []map[string]interface{}
+
+ for _, instance := range s.instances {
+ if instance.ReceivedAt.After(cutoff) {
+ volumeServers = append(volumeServers, map[string]interface{}{
+ "date": instance.ReceivedAt.Format("2006-01-02"),
+ "value": instance.TelemetryData.VolumeServerCount,
+ })
+ diskUsage = append(diskUsage, map[string]interface{}{
+ "date": instance.ReceivedAt.Format("2006-01-02"),
+ "value": instance.TelemetryData.TotalDiskBytes,
+ })
+ }
+ }
+
+ return map[string]interface{}{
+ "volume_servers": volumeServers,
+ "disk_usage": diskUsage,
+ }, nil
+}
+
+func (s *PrometheusStorage) updateStats() {
+ now := time.Now()
+ last7Days := now.AddDate(0, 0, -7)
+ last30Days := now.AddDate(0, 0, -30)
+
+ totalInstances := 0
+ activeInstances := 0
+ versions := make(map[string]int)
+ osDistribution := make(map[string]int)
+ deployments := make(map[string]int)
+
+ for _, instance := range s.instances {
+ if instance.ReceivedAt.After(last30Days) {
+ totalInstances++
+ }
+ if instance.ReceivedAt.After(last7Days) {
+ activeInstances++
+ versions[instance.TelemetryData.Version]++
+ osDistribution[instance.TelemetryData.Os]++
+ deployments[instance.TelemetryData.Deployment]++
+ }
+ }
+
+ // Update Prometheus gauges
+ s.totalClusters.Set(float64(totalInstances))
+ s.activeClusters.Set(float64(activeInstances))
+
+ // Update cached stats for API
+ s.stats = map[string]interface{}{
+ "total_instances": totalInstances,
+ "active_instances": activeInstances,
+ "versions": versions,
+ "os_distribution": osDistribution,
+ "deployments": deployments,
+ }
+}
+
+// CleanupOldInstances removes instances older than the specified duration
+func (s *PrometheusStorage) CleanupOldInstances(maxAge time.Duration) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ cutoff := time.Now().Add(-maxAge)
+ for instanceID, instance := range s.instances {
+ if instance.ReceivedAt.Before(cutoff) {
+ delete(s.instances, instanceID)
+
+ // Remove from Prometheus metrics
+ labels := prometheus.Labels{
+ "cluster_id": instance.TelemetryData.ClusterId,
+ "version": instance.TelemetryData.Version,
+ "os": instance.TelemetryData.Os,
+ "deployment": instance.TelemetryData.Deployment,
+ }
+ s.volumeServerCount.Delete(labels)
+ s.totalDiskBytes.Delete(labels)
+ s.totalVolumeCount.Delete(labels)
+ s.filerCount.Delete(labels)
+ s.brokerCount.Delete(labels)
+ }
+ }
+
+ s.updateStats()
+}