aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding/monitoring.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-01 11:18:32 -0700
committerGitHub <noreply@github.com>2025-08-01 11:18:32 -0700
commit0975968e71b05368d5f28f788cf863c2042c2696 (patch)
tree5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/erasure_coding/monitoring.go
parent1cba609bfa2306cc2885df212febd5ff954aa693 (diff)
downloadseaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz
seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip
admin: Refactor task destination planning (#7063)
* refactor planning into task detection * refactoring worker tasks * refactor * compiles, but only balance task is registered * compiles, but has nil exception * avoid nil logger * add back ec task * setting ec log directory * implement balance and vacuum tasks * EC tasks will no longer fail with "file not found" errors * Use ReceiveFile API to send locally generated shards * distributing shard files and ecx,ecj,vif files * generate .ecx files correctly * do not mount all possible EC shards (0-13) on every destination * use constants * delete all replicas * rename files * pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/erasure_coding/monitoring.go')
-rw-r--r--weed/worker/tasks/erasure_coding/monitoring.go229
1 files changed, 229 insertions, 0 deletions
diff --git a/weed/worker/tasks/erasure_coding/monitoring.go b/weed/worker/tasks/erasure_coding/monitoring.go
new file mode 100644
index 000000000..799eb62c8
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/monitoring.go
@@ -0,0 +1,229 @@
+package erasure_coding
+
+import (
+ "sync"
+ "time"
+)
+
+// ErasureCodingMetrics contains erasure coding-specific monitoring data
+type ErasureCodingMetrics struct {
+ // Execution metrics
+ VolumesEncoded int64 `json:"volumes_encoded"`
+ TotalShardsCreated int64 `json:"total_shards_created"`
+ TotalDataProcessed int64 `json:"total_data_processed"`
+ TotalSourcesRemoved int64 `json:"total_sources_removed"`
+ LastEncodingTime time.Time `json:"last_encoding_time"`
+
+ // Performance metrics
+ AverageEncodingTime int64 `json:"average_encoding_time_seconds"`
+ AverageShardSize int64 `json:"average_shard_size"`
+ AverageDataShards int `json:"average_data_shards"`
+ AverageParityShards int `json:"average_parity_shards"`
+ SuccessfulOperations int64 `json:"successful_operations"`
+ FailedOperations int64 `json:"failed_operations"`
+
+ // Distribution metrics
+ ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"`
+ ShardsPerRack map[string]int64 `json:"shards_per_rack"`
+ PlacementSuccessRate float64 `json:"placement_success_rate"`
+
+ // Current task metrics
+ CurrentVolumeSize int64 `json:"current_volume_size"`
+ CurrentShardCount int `json:"current_shard_count"`
+ VolumesPendingEncoding int `json:"volumes_pending_encoding"`
+
+ mutex sync.RWMutex
+}
+
+// NewErasureCodingMetrics creates a new erasure coding metrics instance
+func NewErasureCodingMetrics() *ErasureCodingMetrics {
+ return &ErasureCodingMetrics{
+ LastEncodingTime: time.Now(),
+ ShardsPerDataCenter: make(map[string]int64),
+ ShardsPerRack: make(map[string]int64),
+ }
+}
+
+// RecordVolumeEncoded records a successful volume encoding operation
+func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.VolumesEncoded++
+ m.TotalShardsCreated += int64(shardsCreated)
+ m.TotalDataProcessed += volumeSize
+ m.SuccessfulOperations++
+ m.LastEncodingTime = time.Now()
+
+ if sourceRemoved {
+ m.TotalSourcesRemoved++
+ }
+
+ // Update average encoding time
+ if m.AverageEncodingTime == 0 {
+ m.AverageEncodingTime = int64(encodingTime.Seconds())
+ } else {
+ // Exponential moving average
+ newTime := int64(encodingTime.Seconds())
+ m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
+ }
+
+ // Update average shard size
+ if shardsCreated > 0 {
+ avgShardSize := volumeSize / int64(shardsCreated)
+ if m.AverageShardSize == 0 {
+ m.AverageShardSize = avgShardSize
+ } else {
+ m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
+ }
+ }
+
+ // Update average data/parity shards
+ if m.AverageDataShards == 0 {
+ m.AverageDataShards = dataShards
+ m.AverageParityShards = parityShards
+ } else {
+ m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
+ m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
+ }
+}
+
+// RecordFailure records a failed erasure coding operation
+func (m *ErasureCodingMetrics) RecordFailure() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.FailedOperations++
+}
+
+// RecordShardPlacement records shard placement for distribution tracking
+func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.ShardsPerDataCenter[dataCenter]++
+ rackKey := dataCenter + ":" + rack
+ m.ShardsPerRack[rackKey]++
+}
+
+// UpdateCurrentVolumeInfo updates current volume processing information
+func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.CurrentVolumeSize = volumeSize
+ m.CurrentShardCount = shardCount
+}
+
+// SetVolumesPendingEncoding sets the number of volumes pending encoding
+func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.VolumesPendingEncoding = count
+}
+
+// UpdatePlacementSuccessRate updates the placement success rate
+func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ if m.PlacementSuccessRate == 0 {
+ m.PlacementSuccessRate = rate
+ } else {
+ // Exponential moving average
+ m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
+ }
+}
+
+// GetMetrics returns a copy of the current metrics (without the mutex)
+func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ // Create deep copy of maps
+ shardsPerDC := make(map[string]int64)
+ for k, v := range m.ShardsPerDataCenter {
+ shardsPerDC[k] = v
+ }
+
+ shardsPerRack := make(map[string]int64)
+ for k, v := range m.ShardsPerRack {
+ shardsPerRack[k] = v
+ }
+
+ // Create a copy without the mutex to avoid copying lock value
+ return ErasureCodingMetrics{
+ VolumesEncoded: m.VolumesEncoded,
+ TotalShardsCreated: m.TotalShardsCreated,
+ TotalDataProcessed: m.TotalDataProcessed,
+ TotalSourcesRemoved: m.TotalSourcesRemoved,
+ LastEncodingTime: m.LastEncodingTime,
+ AverageEncodingTime: m.AverageEncodingTime,
+ AverageShardSize: m.AverageShardSize,
+ AverageDataShards: m.AverageDataShards,
+ AverageParityShards: m.AverageParityShards,
+ SuccessfulOperations: m.SuccessfulOperations,
+ FailedOperations: m.FailedOperations,
+ ShardsPerDataCenter: shardsPerDC,
+ ShardsPerRack: shardsPerRack,
+ PlacementSuccessRate: m.PlacementSuccessRate,
+ CurrentVolumeSize: m.CurrentVolumeSize,
+ CurrentShardCount: m.CurrentShardCount,
+ VolumesPendingEncoding: m.VolumesPendingEncoding,
+ }
+}
+
+// GetSuccessRate returns the success rate as a percentage
+func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ total := m.SuccessfulOperations + m.FailedOperations
+ if total == 0 {
+ return 100.0
+ }
+ return float64(m.SuccessfulOperations) / float64(total) * 100.0
+}
+
+// GetAverageDataProcessed returns the average data processed per volume
+func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.VolumesEncoded == 0 {
+ return 0
+ }
+ return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
+}
+
+// GetSourceRemovalRate returns the percentage of sources removed after encoding
+func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.VolumesEncoded == 0 {
+ return 0
+ }
+ return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
+}
+
+// Reset resets all metrics to zero
+func (m *ErasureCodingMetrics) Reset() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ *m = ErasureCodingMetrics{
+ LastEncodingTime: time.Now(),
+ ShardsPerDataCenter: make(map[string]int64),
+ ShardsPerRack: make(map[string]int64),
+ }
+}
+
+// Global metrics instance for erasure coding tasks
+var globalErasureCodingMetrics = NewErasureCodingMetrics()
+
+// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
+func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
+ return globalErasureCodingMetrics
+}