aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding/monitoring.go
blob: 799eb62c812daceaec2ac0f9ff7e5d95a7a4b107 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package erasure_coding

import (
	"sync"
	"time"
)

// ErasureCodingMetrics contains erasure coding-specific monitoring data
type ErasureCodingMetrics struct {
	// Execution metrics
	VolumesEncoded      int64     `json:"volumes_encoded"`
	TotalShardsCreated  int64     `json:"total_shards_created"`
	TotalDataProcessed  int64     `json:"total_data_processed"`
	TotalSourcesRemoved int64     `json:"total_sources_removed"`
	LastEncodingTime    time.Time `json:"last_encoding_time"`

	// Performance metrics
	AverageEncodingTime  int64 `json:"average_encoding_time_seconds"`
	AverageShardSize     int64 `json:"average_shard_size"`
	AverageDataShards    int   `json:"average_data_shards"`
	AverageParityShards  int   `json:"average_parity_shards"`
	SuccessfulOperations int64 `json:"successful_operations"`
	FailedOperations     int64 `json:"failed_operations"`

	// Distribution metrics
	ShardsPerDataCenter  map[string]int64 `json:"shards_per_datacenter"`
	ShardsPerRack        map[string]int64 `json:"shards_per_rack"`
	PlacementSuccessRate float64          `json:"placement_success_rate"`

	// Current task metrics
	CurrentVolumeSize      int64 `json:"current_volume_size"`
	CurrentShardCount      int   `json:"current_shard_count"`
	VolumesPendingEncoding int   `json:"volumes_pending_encoding"`

	mutex sync.RWMutex
}

// NewErasureCodingMetrics creates a new erasure coding metrics instance
func NewErasureCodingMetrics() *ErasureCodingMetrics {
	return &ErasureCodingMetrics{
		LastEncodingTime:    time.Now(),
		ShardsPerDataCenter: make(map[string]int64),
		ShardsPerRack:       make(map[string]int64),
	}
}

// RecordVolumeEncoded records a successful volume encoding operation
func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	m.VolumesEncoded++
	m.TotalShardsCreated += int64(shardsCreated)
	m.TotalDataProcessed += volumeSize
	m.SuccessfulOperations++
	m.LastEncodingTime = time.Now()

	if sourceRemoved {
		m.TotalSourcesRemoved++
	}

	// Update average encoding time
	if m.AverageEncodingTime == 0 {
		m.AverageEncodingTime = int64(encodingTime.Seconds())
	} else {
		// Exponential moving average
		newTime := int64(encodingTime.Seconds())
		m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
	}

	// Update average shard size
	if shardsCreated > 0 {
		avgShardSize := volumeSize / int64(shardsCreated)
		if m.AverageShardSize == 0 {
			m.AverageShardSize = avgShardSize
		} else {
			m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
		}
	}

	// Update average data/parity shards
	if m.AverageDataShards == 0 {
		m.AverageDataShards = dataShards
		m.AverageParityShards = parityShards
	} else {
		m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
		m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
	}
}

// RecordFailure records a failed erasure coding operation
func (m *ErasureCodingMetrics) RecordFailure() {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	m.FailedOperations++
}

// RecordShardPlacement records shard placement for distribution tracking
func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	m.ShardsPerDataCenter[dataCenter]++
	rackKey := dataCenter + ":" + rack
	m.ShardsPerRack[rackKey]++
}

// UpdateCurrentVolumeInfo updates current volume processing information
func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	m.CurrentVolumeSize = volumeSize
	m.CurrentShardCount = shardCount
}

// SetVolumesPendingEncoding sets the number of volumes pending encoding
func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	m.VolumesPendingEncoding = count
}

// UpdatePlacementSuccessRate updates the placement success rate
func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	if m.PlacementSuccessRate == 0 {
		m.PlacementSuccessRate = rate
	} else {
		// Exponential moving average
		m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
	}
}

// GetMetrics returns a copy of the current metrics (without the mutex)
func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
	m.mutex.RLock()
	defer m.mutex.RUnlock()

	// Create deep copy of maps
	shardsPerDC := make(map[string]int64)
	for k, v := range m.ShardsPerDataCenter {
		shardsPerDC[k] = v
	}

	shardsPerRack := make(map[string]int64)
	for k, v := range m.ShardsPerRack {
		shardsPerRack[k] = v
	}

	// Create a copy without the mutex to avoid copying lock value
	return ErasureCodingMetrics{
		VolumesEncoded:         m.VolumesEncoded,
		TotalShardsCreated:     m.TotalShardsCreated,
		TotalDataProcessed:     m.TotalDataProcessed,
		TotalSourcesRemoved:    m.TotalSourcesRemoved,
		LastEncodingTime:       m.LastEncodingTime,
		AverageEncodingTime:    m.AverageEncodingTime,
		AverageShardSize:       m.AverageShardSize,
		AverageDataShards:      m.AverageDataShards,
		AverageParityShards:    m.AverageParityShards,
		SuccessfulOperations:   m.SuccessfulOperations,
		FailedOperations:       m.FailedOperations,
		ShardsPerDataCenter:    shardsPerDC,
		ShardsPerRack:          shardsPerRack,
		PlacementSuccessRate:   m.PlacementSuccessRate,
		CurrentVolumeSize:      m.CurrentVolumeSize,
		CurrentShardCount:      m.CurrentShardCount,
		VolumesPendingEncoding: m.VolumesPendingEncoding,
	}
}

// GetSuccessRate returns the success rate as a percentage
func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
	m.mutex.RLock()
	defer m.mutex.RUnlock()

	total := m.SuccessfulOperations + m.FailedOperations
	if total == 0 {
		return 100.0
	}
	return float64(m.SuccessfulOperations) / float64(total) * 100.0
}

// GetAverageDataProcessed returns the average data processed per volume
func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
	m.mutex.RLock()
	defer m.mutex.RUnlock()

	if m.VolumesEncoded == 0 {
		return 0
	}
	return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
}

// GetSourceRemovalRate returns the percentage of sources removed after encoding
func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
	m.mutex.RLock()
	defer m.mutex.RUnlock()

	if m.VolumesEncoded == 0 {
		return 0
	}
	return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
}

// Reset resets all metrics to zero
func (m *ErasureCodingMetrics) Reset() {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	*m = ErasureCodingMetrics{
		LastEncodingTime:    time.Now(),
		ShardsPerDataCenter: make(map[string]int64),
		ShardsPerRack:       make(map[string]int64),
	}
}

// Global metrics instance for erasure coding tasks
var globalErasureCodingMetrics = NewErasureCodingMetrics()

// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
	return globalErasureCodingMetrics
}