aboutsummaryrefslogtreecommitdiff
path: root/telemetry/test
diff options
context:
space:
mode:
Diffstat (limited to 'telemetry/test')
-rw-r--r--telemetry/test/integration.go315
1 files changed, 315 insertions, 0 deletions
diff --git a/telemetry/test/integration.go b/telemetry/test/integration.go
new file mode 100644
index 000000000..d0eea4777
--- /dev/null
+++ b/telemetry/test/integration.go
@@ -0,0 +1,315 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/telemetry/proto"
+ "github.com/seaweedfs/seaweedfs/weed/telemetry"
+ protobuf "google.golang.org/protobuf/proto"
+)
+
+const (
+ serverPort = "18080" // Use different port to avoid conflicts
+ serverURL = "http://localhost:" + serverPort
+)
+
+func main() {
+ fmt.Println("๐Ÿงช Starting SeaweedFS Telemetry Integration Test")
+
+ // Start telemetry server
+ fmt.Println("๐Ÿ“ก Starting telemetry server...")
+ serverCmd, err := startTelemetryServer()
+ if err != nil {
+ log.Fatalf("โŒ Failed to start telemetry server: %v", err)
+ }
+ defer stopServer(serverCmd)
+
+ // Wait for server to start
+ if !waitForServer(serverURL+"/health", 15*time.Second) {
+ log.Fatal("โŒ Telemetry server failed to start")
+ }
+ fmt.Println("โœ… Telemetry server started successfully")
+
+ // Test protobuf marshaling first
+ fmt.Println("๐Ÿ”ง Testing protobuf marshaling...")
+ if err := testProtobufMarshaling(); err != nil {
+ log.Fatalf("โŒ Protobuf marshaling test failed: %v", err)
+ }
+ fmt.Println("โœ… Protobuf marshaling test passed")
+
+ // Test protobuf client
+ fmt.Println("๐Ÿ”„ Testing protobuf telemetry client...")
+ if err := testTelemetryClient(); err != nil {
+ log.Fatalf("โŒ Telemetry client test failed: %v", err)
+ }
+ fmt.Println("โœ… Telemetry client test passed")
+
+ // Test server metrics endpoint
+ fmt.Println("๐Ÿ“Š Testing Prometheus metrics endpoint...")
+ if err := testMetricsEndpoint(); err != nil {
+ log.Fatalf("โŒ Metrics endpoint test failed: %v", err)
+ }
+ fmt.Println("โœ… Metrics endpoint test passed")
+
+ // Test stats API
+ fmt.Println("๐Ÿ“ˆ Testing stats API...")
+ if err := testStatsAPI(); err != nil {
+ log.Fatalf("โŒ Stats API test failed: %v", err)
+ }
+ fmt.Println("โœ… Stats API test passed")
+
+ // Test instances API
+ fmt.Println("๐Ÿ“‹ Testing instances API...")
+ if err := testInstancesAPI(); err != nil {
+ log.Fatalf("โŒ Instances API test failed: %v", err)
+ }
+ fmt.Println("โœ… Instances API test passed")
+
+ fmt.Println("๐ŸŽ‰ All telemetry integration tests passed!")
+}
+
+func startTelemetryServer() (*exec.Cmd, error) {
+ // Get the directory where this test is running
+ testDir, err := os.Getwd()
+ if err != nil {
+ return nil, fmt.Errorf("failed to get working directory: %v", err)
+ }
+
+ // Navigate to the server directory (from main seaweedfs directory)
+ serverDir := filepath.Join(testDir, "telemetry", "server")
+
+ cmd := exec.Command("go", "run", ".",
+ "-port="+serverPort,
+ "-dashboard=false",
+ "-cleanup=1m",
+ "-max-age=1h")
+
+ cmd.Dir = serverDir
+
+ // Create log files for server output
+ logFile, err := os.Create("telemetry-server-test.log")
+ if err != nil {
+ return nil, fmt.Errorf("failed to create log file: %v", err)
+ }
+
+ cmd.Stdout = logFile
+ cmd.Stderr = logFile
+
+ if err := cmd.Start(); err != nil {
+ return nil, fmt.Errorf("failed to start server: %v", err)
+ }
+
+ return cmd, nil
+}
+
+func stopServer(cmd *exec.Cmd) {
+ if cmd != nil && cmd.Process != nil {
+ cmd.Process.Signal(syscall.SIGTERM)
+ cmd.Wait()
+
+ // Clean up log file
+ os.Remove("telemetry-server-test.log")
+ }
+}
+
+func waitForServer(url string, timeout time.Duration) bool {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ fmt.Printf("โณ Waiting for server at %s...\n", url)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ resp, err := http.Get(url)
+ if err == nil {
+ resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ return true
+ }
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+}
+
+func testProtobufMarshaling() error {
+ // Test protobuf marshaling/unmarshaling
+ testData := &proto.TelemetryData{
+ ClusterId: "test-cluster-12345",
+ Version: "test-3.45",
+ Os: "linux/amd64",
+ Features: []string{"filer", "s3api"},
+ Deployment: "test",
+ VolumeServerCount: 2,
+ TotalDiskBytes: 1000000,
+ TotalVolumeCount: 10,
+ FilerCount: 1,
+ BrokerCount: 1,
+ Timestamp: time.Now().Unix(),
+ }
+
+ // Marshal
+ data, err := protobuf.Marshal(testData)
+ if err != nil {
+ return fmt.Errorf("failed to marshal protobuf: %v", err)
+ }
+
+ fmt.Printf(" Protobuf size: %d bytes\n", len(data))
+
+ // Unmarshal
+ testData2 := &proto.TelemetryData{}
+ if err := protobuf.Unmarshal(data, testData2); err != nil {
+ return fmt.Errorf("failed to unmarshal protobuf: %v", err)
+ }
+
+ // Verify data
+ if testData2.ClusterId != testData.ClusterId {
+ return fmt.Errorf("protobuf data mismatch: expected %s, got %s",
+ testData.ClusterId, testData2.ClusterId)
+ }
+
+ if testData2.VolumeServerCount != testData.VolumeServerCount {
+ return fmt.Errorf("volume server count mismatch: expected %d, got %d",
+ testData.VolumeServerCount, testData2.VolumeServerCount)
+ }
+
+ return nil
+}
+
+func testTelemetryClient() error {
+ // Create telemetry client
+ client := telemetry.NewClient(serverURL+"/api/collect", true)
+
+ // Create test data using protobuf format
+ testData := &proto.TelemetryData{
+ Version: "test-3.45",
+ Os: "linux/amd64",
+ Features: []string{"filer", "s3api", "mq"},
+ Deployment: "integration-test",
+ VolumeServerCount: 3,
+ TotalDiskBytes: 1073741824, // 1GB
+ TotalVolumeCount: 50,
+ FilerCount: 2,
+ BrokerCount: 1,
+ Timestamp: time.Now().Unix(),
+ }
+
+ // Send telemetry data
+ if err := client.SendTelemetry(testData); err != nil {
+ return fmt.Errorf("failed to send telemetry: %v", err)
+ }
+
+ fmt.Printf(" Sent telemetry for cluster: %s\n", client.GetInstanceID())
+
+ // Wait a bit for processing
+ time.Sleep(2 * time.Second)
+
+ return nil
+}
+
+func testMetricsEndpoint() error {
+ resp, err := http.Get(serverURL + "/metrics")
+ if err != nil {
+ return fmt.Errorf("failed to get metrics: %v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("metrics endpoint returned status %d", resp.StatusCode)
+ }
+
+ // Read response and check for expected metrics
+ content, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read metrics response: %v", err)
+ }
+
+ contentStr := string(content)
+ expectedMetrics := []string{
+ "seaweedfs_telemetry_total_clusters",
+ "seaweedfs_telemetry_active_clusters",
+ "seaweedfs_telemetry_reports_received_total",
+ "seaweedfs_telemetry_volume_servers",
+ "seaweedfs_telemetry_disk_bytes",
+ "seaweedfs_telemetry_volume_count",
+ "seaweedfs_telemetry_filer_count",
+ "seaweedfs_telemetry_broker_count",
+ }
+
+ for _, metric := range expectedMetrics {
+ if !strings.Contains(contentStr, metric) {
+ return fmt.Errorf("missing expected metric: %s", metric)
+ }
+ }
+
+ // Check that we have at least one report received
+ if !strings.Contains(contentStr, "seaweedfs_telemetry_reports_received_total 1") {
+ fmt.Printf(" Warning: Expected at least 1 report received, metrics content:\n%s\n", contentStr)
+ }
+
+ fmt.Printf(" Found %d expected metrics\n", len(expectedMetrics))
+
+ return nil
+}
+
+func testStatsAPI() error {
+ resp, err := http.Get(serverURL + "/api/stats")
+ if err != nil {
+ return fmt.Errorf("failed to get stats: %v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("stats API returned status %d", resp.StatusCode)
+ }
+
+ // Read and verify JSON response
+ content, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read stats response: %v", err)
+ }
+
+ contentStr := string(content)
+ if !strings.Contains(contentStr, "total_instances") {
+ return fmt.Errorf("stats response missing total_instances field")
+ }
+
+ fmt.Printf(" Stats response: %s\n", contentStr)
+
+ return nil
+}
+
+func testInstancesAPI() error {
+ resp, err := http.Get(serverURL + "/api/instances?limit=10")
+ if err != nil {
+ return fmt.Errorf("failed to get instances: %v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("instances API returned status %d", resp.StatusCode)
+ }
+
+ // Read response
+ content, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read instances response: %v", err)
+ }
+
+ fmt.Printf(" Instances response length: %d bytes\n", len(content))
+
+ return nil
+}