aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-06-28 14:11:55 -0700
committerGitHub <noreply@github.com>2025-06-28 14:11:55 -0700
commita1aab8a083111dd9357c7f35397fdf610f23cb6d (patch)
tree66b56b09bec60cd1962236b7aad43a37011450b9 /weed
parent29892c43ff95ad957c0f64ad5cd11e0d43e616e2 (diff)
downloadseaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.tar.xz
seaweedfs-a1aab8a083111dd9357c7f35397fdf610f23cb6d.zip
add telemetry (#6926)
* add telemetry * fix go mod * add default telemetry server url * Update README.md * replace with broker count instead of s3 count * Update telemetry.pb.go * github action to deploy
Diffstat (limited to 'weed')
-rw-r--r--weed/command/master.go6
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/server/master_server.go30
-rw-r--r--weed/telemetry/client.go100
-rw-r--r--weed/telemetry/collector.go218
5 files changed, 356 insertions, 0 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index a8cdf76c6..0761687d9 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -61,6 +61,8 @@ type MasterOptions struct {
electionTimeout *time.Duration
raftHashicorp *bool
raftBootstrap *bool
+ telemetryUrl *string
+ telemetryEnabled *bool
}
func init() {
@@ -88,6 +90,8 @@ func init() {
m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft")
m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
+ m.telemetryUrl = cmdMaster.Flag.String("telemetry.url", "https://telemetry.seaweedfs.com:3091/api/collect", "telemetry server URL to send usage statistics")
+ m.telemetryEnabled = cmdMaster.Flag.Bool("telemetry", false, "enable telemetry reporting")
}
var cmdMaster = &Command{
@@ -332,5 +336,7 @@ func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOp
DisableHttp: *m.disableHttp,
MetricsAddress: *m.metricsAddress,
MetricsIntervalSec: *m.metricsIntervalSec,
+ TelemetryUrl: *m.telemetryUrl,
+ TelemetryEnabled: *m.telemetryEnabled,
}
}
diff --git a/weed/command/server.go b/weed/command/server.go
index dd3b0c8b4..c1f80b325 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -104,6 +104,8 @@ func init() {
masterOptions.raftBootstrap = cmdServer.Flag.Bool("master.raftBootstrap", false, "Whether to bootstrap the Raft cluster")
masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers")
+ masterOptions.telemetryUrl = cmdServer.Flag.String("master.telemetry.url", "https://telemetry.seaweedfs.com:3091/api/collect", "telemetry server URL to send usage statistics")
+ masterOptions.telemetryEnabled = cmdServer.Flag.Bool("master.telemetry", false, "enable telemetry reporting")
filerOptions.filerGroup = cmdServer.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 6569fdbd4..48576adf4 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -8,11 +8,13 @@ import (
"net/url"
"os"
"regexp"
+ "runtime"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
+ "github.com/seaweedfs/seaweedfs/weed/telemetry"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -30,6 +32,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
@@ -52,6 +55,8 @@ type MasterOption struct {
MetricsAddress string
MetricsIntervalSec int
IsFollower bool
+ TelemetryUrl string
+ TelemetryEnabled bool
}
type MasterServer struct {
@@ -76,6 +81,9 @@ type MasterServer struct {
adminLocks *AdminLocks
Cluster *cluster.Cluster
+
+ // telemetry
+ telemetryCollector *telemetry.Collector
}
func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
@@ -131,6 +139,28 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
+ // Initialize telemetry after topology is created
+ if option.TelemetryEnabled && option.TelemetryUrl != "" {
+ telemetryClient := telemetry.NewClient(option.TelemetryUrl, option.TelemetryEnabled)
+ ms.telemetryCollector = telemetry.NewCollector(telemetryClient, ms.Topo, ms.Cluster)
+ ms.telemetryCollector.SetMasterServer(ms)
+
+ // Set version and OS information
+ ms.telemetryCollector.SetVersion(version.VERSION_NUMBER)
+ ms.telemetryCollector.SetOS(runtime.GOOS + "/" + runtime.GOARCH)
+
+ // Determine features and deployment type
+ features := []string{"master"}
+ if len(peers) > 1 {
+ features = append(features, "cluster")
+ }
+ ms.telemetryCollector.SetFeatures(features)
+ ms.telemetryCollector.SetDeployment(telemetry.DetermineDeployment(true, false, len(peers)))
+
+ // Start periodic telemetry collection (every 24 hours)
+ ms.telemetryCollector.StartPeriodicCollection(24 * time.Hour)
+ }
+
ms.guard = security.NewGuard(append(ms.option.WhiteList, whiteList...), signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources2(r)
diff --git a/weed/telemetry/client.go b/weed/telemetry/client.go
new file mode 100644
index 000000000..528984d4d
--- /dev/null
+++ b/weed/telemetry/client.go
@@ -0,0 +1,100 @@
+package telemetry
+
+import (
+ "bytes"
+ "fmt"
+ "net/http"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/seaweedfs/seaweedfs/telemetry/proto"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ protobuf "google.golang.org/protobuf/proto"
+)
+
+type Client struct {
+ url string
+ enabled bool
+ instanceID string
+ httpClient *http.Client
+}
+
+// NewClient creates a new telemetry client
+func NewClient(url string, enabled bool) *Client {
+ return &Client{
+ url: url,
+ enabled: enabled,
+ instanceID: uuid.New().String(), // Generate UUID in memory only
+ httpClient: &http.Client{
+ Timeout: 10 * time.Second,
+ },
+ }
+}
+
+// IsEnabled returns whether telemetry is enabled
+func (c *Client) IsEnabled() bool {
+ return c.enabled && c.url != ""
+}
+
+// SendTelemetry sends telemetry data synchronously using protobuf format
+func (c *Client) SendTelemetry(data *proto.TelemetryData) error {
+ if !c.IsEnabled() {
+ return nil
+ }
+
+ // Set the cluster ID
+ data.ClusterId = c.instanceID
+
+ return c.sendProtobuf(data)
+}
+
+// SendTelemetryAsync sends telemetry data asynchronously
+func (c *Client) SendTelemetryAsync(data *proto.TelemetryData) {
+ if !c.IsEnabled() {
+ return
+ }
+
+ go func() {
+ if err := c.SendTelemetry(data); err != nil {
+ glog.V(1).Infof("Failed to send telemetry: %v", err)
+ }
+ }()
+}
+
+// sendProtobuf sends data using protobuf format
+func (c *Client) sendProtobuf(data *proto.TelemetryData) error {
+ req := &proto.TelemetryRequest{
+ Data: data,
+ }
+
+ body, err := protobuf.Marshal(req)
+ if err != nil {
+ return fmt.Errorf("failed to marshal protobuf: %v", err)
+ }
+
+ httpReq, err := http.NewRequest("POST", c.url, bytes.NewBuffer(body))
+ if err != nil {
+ return fmt.Errorf("failed to create request: %v", err)
+ }
+
+ httpReq.Header.Set("Content-Type", "application/x-protobuf")
+ httpReq.Header.Set("User-Agent", fmt.Sprintf("SeaweedFS/%s", data.Version))
+
+ resp, err := c.httpClient.Do(httpReq)
+ if err != nil {
+ return fmt.Errorf("failed to send request: %v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("server returned status %d", resp.StatusCode)
+ }
+
+ glog.V(2).Infof("Telemetry sent successfully via protobuf")
+ return nil
+}
+
+// GetInstanceID returns the current instance ID
+func (c *Client) GetInstanceID() string {
+ return c.instanceID
+}
diff --git a/weed/telemetry/collector.go b/weed/telemetry/collector.go
new file mode 100644
index 000000000..7991d92c8
--- /dev/null
+++ b/weed/telemetry/collector.go
@@ -0,0 +1,218 @@
+package telemetry
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/telemetry/proto"
+ "github.com/seaweedfs/seaweedfs/weed/cluster"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/topology"
+)
+
+type Collector struct {
+ client *Client
+ topo *topology.Topology
+ cluster *cluster.Cluster
+ masterServer interface{} // Will be set to *weed_server.MasterServer to access client tracking
+ features []string
+ deployment string
+ version string
+ os string
+}
+
+// NewCollector creates a new telemetry collector
+func NewCollector(client *Client, topo *topology.Topology, cluster *cluster.Cluster) *Collector {
+ return &Collector{
+ client: client,
+ topo: topo,
+ cluster: cluster,
+ masterServer: nil,
+ features: []string{},
+ deployment: "unknown",
+ version: "unknown",
+ os: "unknown",
+ }
+}
+
+// SetFeatures sets the list of enabled features
+func (c *Collector) SetFeatures(features []string) {
+ c.features = features
+}
+
+// SetDeployment sets the deployment type (standalone, cluster, etc.)
+func (c *Collector) SetDeployment(deployment string) {
+ c.deployment = deployment
+}
+
+// SetVersion sets the SeaweedFS version
+func (c *Collector) SetVersion(version string) {
+ c.version = version
+}
+
+// SetOS sets the operating system information
+func (c *Collector) SetOS(os string) {
+ c.os = os
+}
+
+// SetMasterServer sets a reference to the master server for client tracking
+func (c *Collector) SetMasterServer(masterServer interface{}) {
+ c.masterServer = masterServer
+}
+
+// CollectAndSendAsync collects telemetry data and sends it asynchronously
+func (c *Collector) CollectAndSendAsync() {
+ if !c.client.IsEnabled() {
+ return
+ }
+
+ go func() {
+ data := c.collectData()
+ c.client.SendTelemetryAsync(data)
+ }()
+}
+
+// StartPeriodicCollection starts sending telemetry data periodically
+func (c *Collector) StartPeriodicCollection(interval time.Duration) {
+ if !c.client.IsEnabled() {
+ glog.V(1).Infof("Telemetry is disabled, skipping periodic collection")
+ return
+ }
+
+ glog.V(0).Infof("Starting telemetry collection every %v", interval)
+
+ // Send initial telemetry after a short delay
+ go func() {
+ time.Sleep(30 * time.Second) // Wait for cluster to stabilize
+ c.CollectAndSendAsync()
+ }()
+
+ // Start periodic collection
+ ticker := time.NewTicker(interval)
+ go func() {
+ defer ticker.Stop()
+ for range ticker.C {
+ c.CollectAndSendAsync()
+ }
+ }()
+}
+
+// collectData gathers telemetry data from the topology
+func (c *Collector) collectData() *proto.TelemetryData {
+ data := &proto.TelemetryData{
+ Version: c.version,
+ Os: c.os,
+ Features: c.features,
+ Deployment: c.deployment,
+ Timestamp: time.Now().Unix(),
+ }
+
+ if c.topo != nil {
+ // Collect volume server count
+ data.VolumeServerCount = int32(c.countVolumeServers())
+
+ // Collect total disk usage and volume count
+ diskBytes, volumeCount := c.collectVolumeStats()
+ data.TotalDiskBytes = diskBytes
+ data.TotalVolumeCount = int32(volumeCount)
+ }
+
+ if c.cluster != nil {
+ // Collect filer and broker counts
+ data.FilerCount = int32(c.countFilers())
+ data.BrokerCount = int32(c.countBrokers())
+ }
+
+ return data
+}
+
+// countVolumeServers counts the number of active volume servers
+func (c *Collector) countVolumeServers() int {
+ count := 0
+ for _, dcNode := range c.topo.Children() {
+ dc := dcNode.(*topology.DataCenter)
+ for _, rackNode := range dc.Children() {
+ rack := rackNode.(*topology.Rack)
+ for range rack.Children() {
+ count++
+ }
+ }
+ }
+ return count
+}
+
+// collectVolumeStats collects total disk usage and volume count
+func (c *Collector) collectVolumeStats() (uint64, int) {
+ var totalDiskBytes uint64
+ var totalVolumeCount int
+
+ for _, dcNode := range c.topo.Children() {
+ dc := dcNode.(*topology.DataCenter)
+ for _, rackNode := range dc.Children() {
+ rack := rackNode.(*topology.Rack)
+ for _, dnNode := range rack.Children() {
+ dn := dnNode.(*topology.DataNode)
+ volumes := dn.GetVolumes()
+ for _, volumeInfo := range volumes {
+ totalVolumeCount++
+ totalDiskBytes += volumeInfo.Size
+ }
+ }
+ }
+ }
+
+ return totalDiskBytes, totalVolumeCount
+}
+
+// countFilers counts the number of active filer servers across all groups
+func (c *Collector) countFilers() int {
+ // Count all filer-type nodes in the cluster
+ // This includes both pure filer servers and S3 servers (which register as filers)
+ count := 0
+ for _, groupName := range c.getAllFilerGroups() {
+ nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.FilerType)
+ count += len(nodes)
+ }
+ return count
+}
+
+// countBrokers counts the number of active broker servers
+func (c *Collector) countBrokers() int {
+ // Count brokers across all broker groups
+ count := 0
+ for _, groupName := range c.getAllBrokerGroups() {
+ nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.BrokerType)
+ count += len(nodes)
+ }
+ return count
+}
+
+// getAllFilerGroups returns all filer group names
+func (c *Collector) getAllFilerGroups() []string {
+ // For simplicity, we check the default group
+ // In a more sophisticated implementation, we could enumerate all groups
+ return []string{""}
+}
+
+// getAllBrokerGroups returns all broker group names
+func (c *Collector) getAllBrokerGroups() []string {
+ // For simplicity, we check the default group
+ // In a more sophisticated implementation, we could enumerate all groups
+ return []string{""}
+}
+
+// DetermineDeployment determines the deployment type based on configuration
+func DetermineDeployment(isMasterEnabled, isVolumeEnabled bool, peerCount int) string {
+ if isMasterEnabled && isVolumeEnabled {
+ if peerCount > 1 {
+ return "cluster"
+ }
+ return "standalone"
+ }
+ if isMasterEnabled {
+ return "master-only"
+ }
+ if isVolumeEnabled {
+ return "volume-only"
+ }
+ return "unknown"
+}