1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
package telemetry
import (
"time"
"github.com/seaweedfs/seaweedfs/telemetry/proto"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/topology"
)
// Collector gathers telemetry data from a SeaweedFS cluster
// Only the leader master will send telemetry to avoid duplicates
type Collector struct {
client *Client
topo *topology.Topology
cluster *cluster.Cluster
masterServer interface{} // Will be set to *weed_server.MasterServer to access client tracking
version string
os string
}
// NewCollector creates a new telemetry collector
func NewCollector(client *Client, topo *topology.Topology, cluster *cluster.Cluster) *Collector {
return &Collector{
client: client,
topo: topo,
cluster: cluster,
masterServer: nil,
version: "unknown",
os: "unknown",
}
}
// SetVersion sets the SeaweedFS version
func (c *Collector) SetVersion(version string) {
c.version = version
}
// SetOS sets the operating system information
func (c *Collector) SetOS(os string) {
c.os = os
}
// SetMasterServer sets a reference to the master server for client tracking
func (c *Collector) SetMasterServer(masterServer interface{}) {
c.masterServer = masterServer
}
// isLeader checks if this master is the leader
func (c *Collector) isLeader() bool {
if c.topo == nil {
return false
}
return c.topo.IsLeader()
}
// CollectAndSendAsync collects telemetry data and sends it asynchronously
// Only sends telemetry if this master is the leader
func (c *Collector) CollectAndSendAsync() {
if !c.client.IsEnabled() {
return
}
go func() {
data := c.collectData()
c.client.SendTelemetryAsync(data)
}()
}
// StartPeriodicCollection starts sending telemetry data periodically
func (c *Collector) StartPeriodicCollection(interval time.Duration) {
if !c.client.IsEnabled() {
glog.V(1).Infof("Telemetry is disabled, skipping periodic collection")
return
}
glog.V(0).Infof("Starting telemetry collection every %v", interval)
// Send initial telemetry after a short delay
go func() {
time.Sleep(61 * time.Second) // Wait for cluster to stabilize
if c.isLeader() {
c.CollectAndSendAsync()
} else {
glog.V(2).Infof("Skipping initial telemetry collection - not the leader master")
}
}()
// Start periodic collection
ticker := time.NewTicker(interval)
go func() {
defer ticker.Stop()
for range ticker.C {
// Check leadership before each collection
if c.isLeader() {
c.CollectAndSendAsync()
} else {
glog.V(2).Infof("Skipping periodic telemetry collection - not the leader master")
}
}
}()
}
// collectData gathers telemetry data from the topology
func (c *Collector) collectData() *proto.TelemetryData {
data := &proto.TelemetryData{
Version: c.version,
Os: c.os,
Timestamp: time.Now().Unix(),
}
if c.topo != nil {
// Collect volume server count
data.VolumeServerCount = int32(c.countVolumeServers())
// Collect total disk usage and volume count
diskBytes, volumeCount := c.collectVolumeStats()
data.TotalDiskBytes = diskBytes
data.TotalVolumeCount = int32(volumeCount)
}
if c.cluster != nil {
// Collect filer and broker counts
data.FilerCount = int32(c.countFilers())
data.BrokerCount = int32(c.countBrokers())
}
return data
}
// countVolumeServers counts the number of active volume servers
func (c *Collector) countVolumeServers() int {
count := 0
for _, dcNode := range c.topo.Children() {
dc := dcNode.(*topology.DataCenter)
for _, rackNode := range dc.Children() {
rack := rackNode.(*topology.Rack)
for range rack.Children() {
count++
}
}
}
return count
}
// collectVolumeStats collects total disk usage and volume count
func (c *Collector) collectVolumeStats() (uint64, int) {
var totalDiskBytes uint64
var totalVolumeCount int
for _, dcNode := range c.topo.Children() {
dc := dcNode.(*topology.DataCenter)
for _, rackNode := range dc.Children() {
rack := rackNode.(*topology.Rack)
for _, dnNode := range rack.Children() {
dn := dnNode.(*topology.DataNode)
volumes := dn.GetVolumes()
for _, volumeInfo := range volumes {
totalVolumeCount++
totalDiskBytes += volumeInfo.Size
}
}
}
}
return totalDiskBytes, totalVolumeCount
}
// countFilers counts the number of active filer servers across all groups
func (c *Collector) countFilers() int {
// Count all filer-type nodes in the cluster
// This includes both pure filer servers and S3 servers (which register as filers)
count := 0
for _, groupName := range c.getAllFilerGroups() {
nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.FilerType)
count += len(nodes)
}
return count
}
// countBrokers counts the number of active broker servers
func (c *Collector) countBrokers() int {
// Count brokers across all broker groups
count := 0
for _, groupName := range c.getAllBrokerGroups() {
nodes := c.cluster.ListClusterNode(cluster.FilerGroupName(groupName), cluster.BrokerType)
count += len(nodes)
}
return count
}
// getAllFilerGroups returns all filer group names
func (c *Collector) getAllFilerGroups() []string {
// For simplicity, we check the default group
// In a more sophisticated implementation, we could enumerate all groups
return []string{""}
}
// getAllBrokerGroups returns all broker group names
func (c *Collector) getAllBrokerGroups() []string {
// For simplicity, we check the default group
// In a more sophisticated implementation, we could enumerate all groups
return []string{""}
}
|