diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-06-28 14:11:55 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-28 14:11:55 -0700 |
| commit | a1aab8a083111dd9357c7f35397fdf610f23cb6d (patch) | |
| tree | 66b56b09bec60cd1962236b7aad43a37011450b9 /telemetry/test | |
| parent | 29892c43ff95ad957c0f64ad5cd11e0d43e616e2 (diff) | |
| download | seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.tar.xz seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.zip | |
add telemetry (#6926)
* add telemetry
* fix go mod
* add default telemetry server url
* Update README.md
* replace with broker count instead of s3 count
* Update telemetry.pb.go
* github action to deploy
Diffstat (limited to 'telemetry/test')
| -rw-r--r-- | telemetry/test/integration.go | 315 |
1 files changed, 315 insertions, 0 deletions
diff --git a/telemetry/test/integration.go b/telemetry/test/integration.go new file mode 100644 index 000000000..d0eea4777 --- /dev/null +++ b/telemetry/test/integration.go @@ -0,0 +1,315 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/seaweedfs/seaweedfs/telemetry/proto" + "github.com/seaweedfs/seaweedfs/weed/telemetry" + protobuf "google.golang.org/protobuf/proto" +) + +const ( + serverPort = "18080" // Use different port to avoid conflicts + serverURL = "http://localhost:" + serverPort +) + +func main() { + fmt.Println("๐งช Starting SeaweedFS Telemetry Integration Test") + + // Start telemetry server + fmt.Println("๐ก Starting telemetry server...") + serverCmd, err := startTelemetryServer() + if err != nil { + log.Fatalf("โ Failed to start telemetry server: %v", err) + } + defer stopServer(serverCmd) + + // Wait for server to start + if !waitForServer(serverURL+"/health", 15*time.Second) { + log.Fatal("โ Telemetry server failed to start") + } + fmt.Println("โ
Telemetry server started successfully") + + // Test protobuf marshaling first + fmt.Println("๐ง Testing protobuf marshaling...") + if err := testProtobufMarshaling(); err != nil { + log.Fatalf("โ Protobuf marshaling test failed: %v", err) + } + fmt.Println("โ
Protobuf marshaling test passed") + + // Test protobuf client + fmt.Println("๐ Testing protobuf telemetry client...") + if err := testTelemetryClient(); err != nil { + log.Fatalf("โ Telemetry client test failed: %v", err) + } + fmt.Println("โ
Telemetry client test passed") + + // Test server metrics endpoint + fmt.Println("๐ Testing Prometheus metrics endpoint...") + if err := testMetricsEndpoint(); err != nil { + log.Fatalf("โ Metrics endpoint test failed: %v", err) + } + fmt.Println("โ
Metrics endpoint test passed") + + // Test stats API + fmt.Println("๐ Testing stats API...") + if err := testStatsAPI(); err != nil { + log.Fatalf("โ Stats API test failed: %v", err) + } + fmt.Println("โ
Stats API test passed") + + // Test instances API + fmt.Println("๐ Testing instances API...") + if err := testInstancesAPI(); err != nil { + log.Fatalf("โ Instances API test failed: %v", err) + } + fmt.Println("โ
Instances API test passed") + + fmt.Println("๐ All telemetry integration tests passed!") +} + +func startTelemetryServer() (*exec.Cmd, error) { + // Get the directory where this test is running + testDir, err := os.Getwd() + if err != nil { + return nil, fmt.Errorf("failed to get working directory: %v", err) + } + + // Navigate to the server directory (from main seaweedfs directory) + serverDir := filepath.Join(testDir, "telemetry", "server") + + cmd := exec.Command("go", "run", ".", + "-port="+serverPort, + "-dashboard=false", + "-cleanup=1m", + "-max-age=1h") + + cmd.Dir = serverDir + + // Create log files for server output + logFile, err := os.Create("telemetry-server-test.log") + if err != nil { + return nil, fmt.Errorf("failed to create log file: %v", err) + } + + cmd.Stdout = logFile + cmd.Stderr = logFile + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start server: %v", err) + } + + return cmd, nil +} + +func stopServer(cmd *exec.Cmd) { + if cmd != nil && cmd.Process != nil { + cmd.Process.Signal(syscall.SIGTERM) + cmd.Wait() + + // Clean up log file + os.Remove("telemetry-server-test.log") + } +} + +func waitForServer(url string, timeout time.Duration) bool { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + fmt.Printf("โณ Waiting for server at %s...\n", url) + + for { + select { + case <-ctx.Done(): + return false + default: + resp, err := http.Get(url) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return true + } + } + time.Sleep(500 * time.Millisecond) + } + } +} + +func testProtobufMarshaling() error { + // Test protobuf marshaling/unmarshaling + testData := &proto.TelemetryData{ + ClusterId: "test-cluster-12345", + Version: "test-3.45", + Os: "linux/amd64", + Features: []string{"filer", "s3api"}, + Deployment: "test", + VolumeServerCount: 2, + TotalDiskBytes: 1000000, + TotalVolumeCount: 10, + FilerCount: 1, + BrokerCount: 1, + Timestamp: time.Now().Unix(), + } + + // Marshal + data, err := protobuf.Marshal(testData) + if err != nil { + return fmt.Errorf("failed to marshal protobuf: %v", err) + } + + fmt.Printf(" Protobuf size: %d bytes\n", len(data)) + + // Unmarshal + testData2 := &proto.TelemetryData{} + if err := protobuf.Unmarshal(data, testData2); err != nil { + return fmt.Errorf("failed to unmarshal protobuf: %v", err) + } + + // Verify data + if testData2.ClusterId != testData.ClusterId { + return fmt.Errorf("protobuf data mismatch: expected %s, got %s", + testData.ClusterId, testData2.ClusterId) + } + + if testData2.VolumeServerCount != testData.VolumeServerCount { + return fmt.Errorf("volume server count mismatch: expected %d, got %d", + testData.VolumeServerCount, testData2.VolumeServerCount) + } + + return nil +} + +func testTelemetryClient() error { + // Create telemetry client + client := telemetry.NewClient(serverURL+"/api/collect", true) + + // Create test data using protobuf format + testData := &proto.TelemetryData{ + Version: "test-3.45", + Os: "linux/amd64", + Features: []string{"filer", "s3api", "mq"}, + Deployment: "integration-test", + VolumeServerCount: 3, + TotalDiskBytes: 1073741824, // 1GB + TotalVolumeCount: 50, + FilerCount: 2, + BrokerCount: 1, + Timestamp: time.Now().Unix(), + } + + // Send telemetry data + if err := client.SendTelemetry(testData); err != nil { + return fmt.Errorf("failed to send telemetry: %v", err) + } + + fmt.Printf(" Sent telemetry for cluster: %s\n", client.GetInstanceID()) + + // Wait a bit for processing + time.Sleep(2 * time.Second) + + return nil +} + +func testMetricsEndpoint() error { + resp, err := http.Get(serverURL + "/metrics") + if err != nil { + return fmt.Errorf("failed to get metrics: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("metrics endpoint returned status %d", resp.StatusCode) + } + + // Read response and check for expected metrics + content, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read metrics response: %v", err) + } + + contentStr := string(content) + expectedMetrics := []string{ + "seaweedfs_telemetry_total_clusters", + "seaweedfs_telemetry_active_clusters", + "seaweedfs_telemetry_reports_received_total", + "seaweedfs_telemetry_volume_servers", + "seaweedfs_telemetry_disk_bytes", + "seaweedfs_telemetry_volume_count", + "seaweedfs_telemetry_filer_count", + "seaweedfs_telemetry_broker_count", + } + + for _, metric := range expectedMetrics { + if !strings.Contains(contentStr, metric) { + return fmt.Errorf("missing expected metric: %s", metric) + } + } + + // Check that we have at least one report received + if !strings.Contains(contentStr, "seaweedfs_telemetry_reports_received_total 1") { + fmt.Printf(" Warning: Expected at least 1 report received, metrics content:\n%s\n", contentStr) + } + + fmt.Printf(" Found %d expected metrics\n", len(expectedMetrics)) + + return nil +} + +func testStatsAPI() error { + resp, err := http.Get(serverURL + "/api/stats") + if err != nil { + return fmt.Errorf("failed to get stats: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("stats API returned status %d", resp.StatusCode) + } + + // Read and verify JSON response + content, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read stats response: %v", err) + } + + contentStr := string(content) + if !strings.Contains(contentStr, "total_instances") { + return fmt.Errorf("stats response missing total_instances field") + } + + fmt.Printf(" Stats response: %s\n", contentStr) + + return nil +} + +func testInstancesAPI() error { + resp, err := http.Get(serverURL + "/api/instances?limit=10") + if err != nil { + return fmt.Errorf("failed to get instances: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("instances API returned status %d", resp.StatusCode) + } + + // Read response + content, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read instances response: %v", err) + } + + fmt.Printf(" Instances response length: %d bytes\n", len(content)) + + return nil +} |
