aboutsummaryrefslogtreecommitdiff
path: root/weed/stats/cluster_metrics.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-03-25 12:19:53 -0700
committerchrislu <chris.lu@gmail.com>2025-03-25 12:19:53 -0700
commitc2d6fabd935c85527fcfe024b0ef4dda3675aab1 (patch)
treec71816e4bfb92aad0207dd8a8d41bf51b57a5ed7 /weed/stats/cluster_metrics.go
parent138b66231a90f3bdea829d1567cd8f34d91010f1 (diff)
downloadseaweedfs-origin/collect-public-metrics.tar.xz
seaweedfs-origin/collect-public-metrics.zip
add option to collect metricsorigin/collect-public-metrics
Diffstat (limited to 'weed/stats/cluster_metrics.go')
-rw-r--r--weed/stats/cluster_metrics.go186
1 files changed, 186 insertions, 0 deletions
diff --git a/weed/stats/cluster_metrics.go b/weed/stats/cluster_metrics.go
new file mode 100644
index 000000000..53525f04b
--- /dev/null
+++ b/weed/stats/cluster_metrics.go
@@ -0,0 +1,186 @@
+package stats
+
+import (
+ "crypto/sha256"
+ "encoding/hex"
+ "sync"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/push"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+)
+
+const (
+ PushGatewayURL = "http://metrics.seaweedfs.com:9091"
+)
+
+var (
+ ClusterMetrics = prometheus.NewRegistry()
+
+ ClusterId = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "id",
+ Help: "Unique cluster identifier",
+ })
+
+ ClusterVolumeCount = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "volume_count",
+ Help: "Total number of volumes in the cluster",
+ })
+
+ ClusterTotalCapacity = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "total_capacity_bytes",
+ Help: "Total storage capacity in bytes",
+ })
+
+ ClusterUsedCapacity = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "used_capacity_bytes",
+ Help: "Used storage capacity in bytes",
+ })
+
+ ClusterVersion = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "version",
+ Help: "Cluster version",
+ })
+
+ ClusterVolumeServerCount = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "volume_server_count",
+ Help: "Number of volume servers in the cluster",
+ })
+
+ ClusterStorageDistribution = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "storage_distribution_bytes",
+ Help: "Storage distribution by data center and rack",
+ }, []string{"datacenter", "rack"})
+
+ ClusterUsedStorageDistribution = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: "cluster",
+ Name: "used_storage_distribution_bytes",
+ Help: "Used storage distribution by data center and rack",
+ }, []string{"datacenter", "rack"})
+
+ pusher *push.Pusher
+)
+
+func init() {
+ ClusterMetrics.MustRegister(ClusterId)
+ ClusterMetrics.MustRegister(ClusterVolumeCount)
+ ClusterMetrics.MustRegister(ClusterTotalCapacity)
+ ClusterMetrics.MustRegister(ClusterUsedCapacity)
+ ClusterMetrics.MustRegister(ClusterVersion)
+ ClusterMetrics.MustRegister(ClusterVolumeServerCount)
+ ClusterMetrics.MustRegister(ClusterStorageDistribution)
+ ClusterMetrics.MustRegister(ClusterUsedStorageDistribution)
+
+ // Initialize the push gateway client
+ pusher = push.New(PushGatewayURL, "seaweedfs_cluster").Gatherer(ClusterMetrics)
+}
+
+var (
+ clusterIdOnce sync.Once
+ clusterId string
+)
+
+// GenerateClusterId creates a unique cluster ID based on the first volume server's address
+func GenerateClusterId(topologyInfo *master_pb.TopologyInfo) string {
+ clusterIdOnce.Do(func() {
+ if len(topologyInfo.DataCenterInfos) > 0 {
+ dc := topologyInfo.DataCenterInfos[0]
+ if len(dc.RackInfos) > 0 {
+ rack := dc.RackInfos[0]
+ if len(rack.DataNodeInfos) > 0 {
+ node := rack.DataNodeInfos[0]
+ hash := sha256.Sum256([]byte(node.Id))
+ clusterId = hex.EncodeToString(hash[:8])
+ }
+ }
+ }
+ if clusterId == "" {
+ clusterId = "unknown"
+ }
+ })
+ return clusterId
+}
+
+// UpdateClusterMetrics updates all cluster-related metrics
+func UpdateClusterMetrics(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMB uint32) {
+ // Set cluster ID
+ GenerateClusterId(topologyInfo) // Generate but don't store the ID
+
+ // Set version
+ ClusterVersion.Set(0) // Reset to 0 since we're using a string version
+
+ // Calculate total metrics
+ var totalVolumeCount uint64
+ var totalCapacity uint64
+ var totalUsedCapacity uint64
+ var volumeServerCount uint64
+
+ // Reset all distribution metrics
+ ClusterStorageDistribution.Reset()
+ ClusterUsedStorageDistribution.Reset()
+
+ for _, dc := range topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ var rackVolumeCount uint64
+ var rackCapacity uint64
+ var rackUsedCapacity uint64
+
+ for _, node := range rack.DataNodeInfos {
+ volumeServerCount++
+ for _, diskInfo := range node.DiskInfos {
+ rackCapacity += uint64(diskInfo.MaxVolumeCount) * uint64(volumeSizeLimitMB) * 1024 * 1024
+ for _, volumeInfo := range diskInfo.VolumeInfos {
+ rackVolumeCount++
+ rackUsedCapacity += volumeInfo.Size
+ }
+ }
+ }
+
+ // Update distribution metrics
+ ClusterStorageDistribution.WithLabelValues(dc.Id, rack.Id).Set(float64(rackCapacity))
+ ClusterUsedStorageDistribution.WithLabelValues(dc.Id, rack.Id).Set(float64(rackUsedCapacity))
+
+ // Update total metrics
+ totalVolumeCount += rackVolumeCount
+ totalCapacity += rackCapacity
+ totalUsedCapacity += rackUsedCapacity
+ }
+ }
+
+ // Update total metrics
+ ClusterVolumeCount.Set(float64(totalVolumeCount))
+ ClusterTotalCapacity.Set(float64(totalCapacity))
+ ClusterUsedCapacity.Set(float64(totalUsedCapacity))
+ ClusterVolumeServerCount.Set(float64(volumeServerCount))
+}
+
+// PushMetrics sends the current metrics to the push gateway
+func PushMetrics(clusterId string, enabled bool) error {
+ if !enabled || pusher == nil {
+ return nil
+ }
+ return pusher.Push()
+}