diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-06-28 14:11:55 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-28 14:11:55 -0700 |
| commit | a1aab8a083111dd9357c7f35397fdf610f23cb6d (patch) | |
| tree | 66b56b09bec60cd1962236b7aad43a37011450b9 /weed | |
| parent | 29892c43ff95ad957c0f64ad5cd11e0d43e616e2 (diff) | |
| download | seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.tar.xz seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.zip | |
add telemetry (#6926)
* add telemetry
* fix go mod
* add default telemetry server url
* Update README.md
* replace with broker count instead of s3 count
* Update telemetry.pb.go
* github action to deploy
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/master.go | 6 | ||||
| -rw-r--r-- | weed/command/server.go | 2 | ||||
| -rw-r--r-- | weed/server/master_server.go | 30 | ||||
| -rw-r--r-- | weed/telemetry/client.go | 100 | ||||
| -rw-r--r-- | weed/telemetry/collector.go | 218 |
5 files changed, 356 insertions, 0 deletions
diff --git a/weed/command/master.go b/weed/command/master.go index a8cdf76c6..0761687d9 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -61,6 +61,8 @@ type MasterOptions struct { electionTimeout *time.Duration raftHashicorp *bool raftBootstrap *bool + telemetryUrl *string + telemetryEnabled *bool } func init() { @@ -88,6 +90,8 @@ func init() { m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft") m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") + m.telemetryUrl = cmdMaster.Flag.String("telemetry.url", "https://telemetry.seaweedfs.com:3091/api/collect", "telemetry server URL to send usage statistics") + m.telemetryEnabled = cmdMaster.Flag.Bool("telemetry", false, "enable telemetry reporting") } var cmdMaster = &Command{ @@ -332,5 +336,7 @@ func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOp DisableHttp: *m.disableHttp, MetricsAddress: *m.metricsAddress, MetricsIntervalSec: *m.metricsIntervalSec, + TelemetryUrl: *m.telemetryUrl, + TelemetryEnabled: *m.telemetryEnabled, } } diff --git a/weed/command/server.go b/weed/command/server.go index dd3b0c8b4..c1f80b325 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -104,6 +104,8 @@ func init() { masterOptions.raftBootstrap = cmdServer.Flag.Bool("master.raftBootstrap", false, "Whether to bootstrap the Raft cluster") masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers") + masterOptions.telemetryUrl = cmdServer.Flag.String("master.telemetry.url", "https://telemetry.seaweedfs.com:3091/api/collect", "telemetry server URL to send usage statistics") + masterOptions.telemetryEnabled = cmdServer.Flag.Bool("master.telemetry", false, "enable telemetry reporting") filerOptions.filerGroup = cmdServer.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 6569fdbd4..48576adf4 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -8,11 +8,13 @@ import ( "net/url" "os" "regexp" + "runtime" "strings" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/telemetry" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -30,6 +32,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) @@ -52,6 +55,8 @@ type MasterOption struct { MetricsAddress string MetricsIntervalSec int IsFollower bool + TelemetryUrl string + TelemetryEnabled bool } type MasterServer struct { @@ -76,6 +81,9 @@ type MasterServer struct { adminLocks *AdminLocks Cluster *cluster.Cluster + + // telemetry + telemetryCollector *telemetry.Collector } func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer { @@ -131,6 +139,28 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") + // Initialize telemetry after topology is created + if option.TelemetryEnabled && option.TelemetryUrl != "" { + telemetryClient := telemetry.NewClient(option.TelemetryUrl, option.TelemetryEnabled) + ms.telemetryCollector = telemetry.NewCollector(telemetryClient, ms.Topo, ms.Cluster) + ms.telemetryCollector.SetMasterServer(ms) + + // Set version and OS information + ms.telemetryCollector.SetVersion(version.VERSION_NUMBER) + ms.telemetryCollector.SetOS(runtime.GOOS + "/" + runtime.GOARCH) + + // Determine features and deployment type + features := []string{"master"} + if len(peers) > 1 { + features = append(features, "cluster") + } + ms.telemetryCollector.SetFeatures(features) + ms.telemetryCollector.SetDeployment(telemetry.DetermineDeployment(true, false, len(peers))) + + // Start periodic telemetry collection (every 24 hours) + ms.telemetryCollector.StartPeriodicCollection(24 * time.Hour) + } + ms.guard = security.NewGuard(append(ms.option.WhiteList, whiteList...), signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources2(r) diff --git a/weed/telemetry/client.go b/weed/telemetry/client.go new file mode 100644 index 000000000..528984d4d --- /dev/null +++ b/weed/telemetry/client.go @@ -0,0 +1,100 @@ +package telemetry + +import ( + "bytes" + "fmt" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/seaweedfs/seaweedfs/telemetry/proto" + "github.com/seaweedfs/seaweedfs/weed/glog" + protobuf "google.golang.org/protobuf/proto" +) + +type Client struct { + url string + enabled bool + instanceID string + httpClient *http.Client +} + +// NewClient creates a new telemetry client +func NewClient(url string, enabled bool) *Client { + return &Client{ + url: url, + enabled: enabled, + instanceID: uuid.New().String(), // Generate UUID in memory only + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +// IsEnabled returns whether telemetry is enabled +func (c *Client) IsEnabled() bool { + return c.enabled && c.url != "" +} + +// SendTelemetry sends telemetry data synchronously using protobuf format +func (c *Client) SendTelemetry(data *proto.TelemetryData) error { + if !c.IsEnabled() { + return nil + } + + // Set the cluster ID + data.ClusterId = c.instanceID + + return c.sendProtobuf(data) +} + +// SendTelemetryAsync sends telemetry data asynchronously +func (c *Client) SendTelemetryAsync(data *proto.TelemetryData) { + if !c.IsEnabled() { + return + } + + go func() { + if err := c.SendTelemetry(data); err != nil { + glog.V(1).Infof("Failed to send telemetry: %v", err) + } + }() +} + +// sendProtobuf sends data using protobuf format +func (c *Client) sendProtobuf(data *proto.TelemetryData) error { + req := &proto.TelemetryRequest{ + Data: data, + } + + body, err := protobuf.Marshal(req) + if err != nil { + return fmt.Errorf("failed to marshal protobuf: %v", err) + } + + httpReq, err := http.NewRequest("POST", c.url, bytes.NewBuffer(body)) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("User-Agent", fmt.Sprintf("SeaweedFS/%s", data.Version)) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return fmt.Errorf("failed to send request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("server returned status %d", resp.StatusCode) + } + + glog.V(2).Infof("Telemetry sent successfully via protobuf") + return nil +} + +// GetInstanceID returns the current instance ID +func (c *Client) GetInstanceID() string { + return c.instanceID +} diff --git a/weed/telemetry/collector.go b/weed/telemetry/collector.go new file mode 100644 index 000000000..7991d92c8 --- /dev/null +++ b/weed/telemetry/collector.go @@ -0,0 +1,218 @@ +package telemetry + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/telemetry/proto" + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/topology" +) + +type Collector struct { + client *Client + topo *topology.Topology + cluster *cluster.Cluster + masterServer interface{} // Will be set to *weed_server.MasterServer to access client tracking + features []string + deployment string + version string + os string +} + +// NewCollector creates a new telemetry collector +func NewCollector(client *Client, topo *topology.Topology, cluster *cluster.Cluster) *Collector { + return &Collector{ + client: client, + topo: topo, + cluster: cluster, + masterServer: nil, + features: []string{}, + deployment: "unknown", + version: "unknown", + os: "unknown", + } +} + +// SetFeatures sets the list of enabled features +func (c *Collector) SetFeatures(features []string) { + c.features = features +} + +// SetDeployment sets the deployment type (standalone, cluster, etc.) +func (c *Collector) SetDeployment(deployment string) { + c.deployment = deployment +} + +// SetVersion sets the SeaweedFS version +func (c *Collector) SetVersion(version string) { + c.version = version +} + +// SetOS sets the operating system information +func (c *Collector) SetOS(os string) { + c.os = os +} + +// SetMasterServer sets a reference to the master server for client tracking +func (c *Collector) SetMasterServer(masterServer interface{}) { + c.masterServer = masterServer +} + +// CollectAndSendAsync collects telemetry data and sends it asynchronously +func (c *Collector) CollectAndSendAsync() { + if !c.client.IsEnabled() { + return + } + + go func() { + data := c.collectData() + c.client.SendTelemetryAsync(data) + }() +} + +// StartPeriodicCollection starts sending telemetry data periodically +func (c *Collector) StartPeriodicCollection(interval time.Duration) { + if !c.client.IsEnabled() { + glog.V(1).Infof("Telemetry is disabled, skipping periodic collection") + return + } + + glog.V(0).Infof("Starting telemetry collection every %v", interval) + + // Send initial telemetry after a short delay + go func() { + time.Sleep(30 * time.Second) // Wait for cluster to stabilize + c.CollectAndSendAsync() + }() + + // Start periodic collection + ticker := time.NewTicker(interval) + go func() { + defer ticker.Stop() + for range ticker.C { + c.CollectAndSendAsync() + } + }() +} + +// collectData gathers telemetry data from the topology +func (c *Collector) collectData() *proto.TelemetryData { + data := &proto.TelemetryData{ + Version: c.version, + Os: c.os, + Features: c.features, + Deployment: c.deployment, + Timestamp: time.Now().Unix(), + } + + if c.topo != nil { + // Collect volume server count + data.VolumeServerCount = int32(c.countVolumeServers()) + + // Collect total disk usage and volume count + diskBytes, volumeCount := c.collectVolumeStats() + data.TotalDiskBytes = diskBytes + data.TotalVolumeCount = int32(volumeCount) + } + + if c.cluster != nil { + // Collect filer and broker counts + data.FilerCount = int32(c.countFilers()) + data.BrokerCount = int32(c.countBrokers()) + } + + return data +} + +// countVolumeServers counts the number of active volume servers +func (c *Collector) countVolumeServers() int { + count := 0 + for _, dcNode := range c.topo.Children() { + dc := dcNode.(*topology.DataCenter) + for _, rackNode := range dc.Children() { + rack := rackNode.(*topology.Rack) + for range rack.Children() { + count++ + } + } + } + return count +} + +// collectVolumeStats collects total disk usage and volume count +func (c *Collector) collectVolumeStats() (uint64, int) { + var totalDiskBytes uint64 + var totalVolumeCount int + + for _, dcNode := range c.topo.Children() { + dc := dcNode.(*topology.DataCenter) + for _, rackNode := range dc.Children() { + rack := rackNode.(*topology.Rack) + for _, dnNode := range rack.Children() { + dn := dnNode.(*topology.DataNode) + volumes := dn.GetVolumes() + for _, volumeInfo := range volumes { + totalVolumeCount++ + totalDiskBytes += volumeInfo.Size + } + } + } + } + + return totalDiskBytes, totalVolumeCount +} + +// countFilers counts the number of active filer servers across all groups +func (c *Collector) countFilers() int { + // Count all filer-type nodes in the cluster + // This includes both pure filer servers and S3 servers (which register as filers) + count := 0 + for _, groupName := range c.getAllFilerGroups() { + nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.FilerType) + count += len(nodes) + } + return count +} + +// countBrokers counts the number of active broker servers +func (c *Collector) countBrokers() int { + // Count brokers across all broker groups + count := 0 + for _, groupName := range c.getAllBrokerGroups() { + nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.BrokerType) + count += len(nodes) + } + return count +} + +// getAllFilerGroups returns all filer group names +func (c *Collector) getAllFilerGroups() []string { + // For simplicity, we check the default group + // In a more sophisticated implementation, we could enumerate all groups + return []string{""} +} + +// getAllBrokerGroups returns all broker group names +func (c *Collector) getAllBrokerGroups() []string { + // For simplicity, we check the default group + // In a more sophisticated implementation, we could enumerate all groups + return []string{""} +} + +// DetermineDeployment determines the deployment type based on configuration +func DetermineDeployment(isMasterEnabled, isVolumeEnabled bool, peerCount int) string { + if isMasterEnabled && isVolumeEnabled { + if peerCount > 1 { + return "cluster" + } + return "standalone" + } + if isMasterEnabled { + return "master-only" + } + if isVolumeEnabled { + return "volume-only" + } + return "unknown" +} |
