diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/erasure_coding/monitoring.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/erasure_coding/monitoring.go')
| -rw-r--r-- | weed/worker/tasks/erasure_coding/monitoring.go | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/weed/worker/tasks/erasure_coding/monitoring.go b/weed/worker/tasks/erasure_coding/monitoring.go new file mode 100644 index 000000000..799eb62c8 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/monitoring.go @@ -0,0 +1,229 @@ +package erasure_coding + +import ( + "sync" + "time" +) + +// ErasureCodingMetrics contains erasure coding-specific monitoring data +type ErasureCodingMetrics struct { + // Execution metrics + VolumesEncoded int64 `json:"volumes_encoded"` + TotalShardsCreated int64 `json:"total_shards_created"` + TotalDataProcessed int64 `json:"total_data_processed"` + TotalSourcesRemoved int64 `json:"total_sources_removed"` + LastEncodingTime time.Time `json:"last_encoding_time"` + + // Performance metrics + AverageEncodingTime int64 `json:"average_encoding_time_seconds"` + AverageShardSize int64 `json:"average_shard_size"` + AverageDataShards int `json:"average_data_shards"` + AverageParityShards int `json:"average_parity_shards"` + SuccessfulOperations int64 `json:"successful_operations"` + FailedOperations int64 `json:"failed_operations"` + + // Distribution metrics + ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"` + ShardsPerRack map[string]int64 `json:"shards_per_rack"` + PlacementSuccessRate float64 `json:"placement_success_rate"` + + // Current task metrics + CurrentVolumeSize int64 `json:"current_volume_size"` + CurrentShardCount int `json:"current_shard_count"` + VolumesPendingEncoding int `json:"volumes_pending_encoding"` + + mutex sync.RWMutex +} + +// NewErasureCodingMetrics creates a new erasure coding metrics instance +func NewErasureCodingMetrics() *ErasureCodingMetrics { + return &ErasureCodingMetrics{ + LastEncodingTime: time.Now(), + ShardsPerDataCenter: make(map[string]int64), + ShardsPerRack: make(map[string]int64), + } +} + +// RecordVolumeEncoded records a successful volume encoding operation +func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.VolumesEncoded++ + m.TotalShardsCreated += int64(shardsCreated) + m.TotalDataProcessed += volumeSize + m.SuccessfulOperations++ + m.LastEncodingTime = time.Now() + + if sourceRemoved { + m.TotalSourcesRemoved++ + } + + // Update average encoding time + if m.AverageEncodingTime == 0 { + m.AverageEncodingTime = int64(encodingTime.Seconds()) + } else { + // Exponential moving average + newTime := int64(encodingTime.Seconds()) + m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5 + } + + // Update average shard size + if shardsCreated > 0 { + avgShardSize := volumeSize / int64(shardsCreated) + if m.AverageShardSize == 0 { + m.AverageShardSize = avgShardSize + } else { + m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5 + } + } + + // Update average data/parity shards + if m.AverageDataShards == 0 { + m.AverageDataShards = dataShards + m.AverageParityShards = parityShards + } else { + m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5 + m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5 + } +} + +// RecordFailure records a failed erasure coding operation +func (m *ErasureCodingMetrics) RecordFailure() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.FailedOperations++ +} + +// RecordShardPlacement records shard placement for distribution tracking +func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.ShardsPerDataCenter[dataCenter]++ + rackKey := dataCenter + ":" + rack + m.ShardsPerRack[rackKey]++ +} + +// UpdateCurrentVolumeInfo updates current volume processing information +func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.CurrentVolumeSize = volumeSize + m.CurrentShardCount = shardCount +} + +// SetVolumesPendingEncoding sets the number of volumes pending encoding +func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.VolumesPendingEncoding = count +} + +// UpdatePlacementSuccessRate updates the placement success rate +func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.PlacementSuccessRate == 0 { + m.PlacementSuccessRate = rate + } else { + // Exponential moving average + m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate + } +} + +// GetMetrics returns a copy of the current metrics (without the mutex) +func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics { + m.mutex.RLock() + defer m.mutex.RUnlock() + + // Create deep copy of maps + shardsPerDC := make(map[string]int64) + for k, v := range m.ShardsPerDataCenter { + shardsPerDC[k] = v + } + + shardsPerRack := make(map[string]int64) + for k, v := range m.ShardsPerRack { + shardsPerRack[k] = v + } + + // Create a copy without the mutex to avoid copying lock value + return ErasureCodingMetrics{ + VolumesEncoded: m.VolumesEncoded, + TotalShardsCreated: m.TotalShardsCreated, + TotalDataProcessed: m.TotalDataProcessed, + TotalSourcesRemoved: m.TotalSourcesRemoved, + LastEncodingTime: m.LastEncodingTime, + AverageEncodingTime: m.AverageEncodingTime, + AverageShardSize: m.AverageShardSize, + AverageDataShards: m.AverageDataShards, + AverageParityShards: m.AverageParityShards, + SuccessfulOperations: m.SuccessfulOperations, + FailedOperations: m.FailedOperations, + ShardsPerDataCenter: shardsPerDC, + ShardsPerRack: shardsPerRack, + PlacementSuccessRate: m.PlacementSuccessRate, + CurrentVolumeSize: m.CurrentVolumeSize, + CurrentShardCount: m.CurrentShardCount, + VolumesPendingEncoding: m.VolumesPendingEncoding, + } +} + +// GetSuccessRate returns the success rate as a percentage +func (m *ErasureCodingMetrics) GetSuccessRate() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + total := m.SuccessfulOperations + m.FailedOperations + if total == 0 { + return 100.0 + } + return float64(m.SuccessfulOperations) / float64(total) * 100.0 +} + +// GetAverageDataProcessed returns the average data processed per volume +func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + if m.VolumesEncoded == 0 { + return 0 + } + return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded) +} + +// GetSourceRemovalRate returns the percentage of sources removed after encoding +func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + if m.VolumesEncoded == 0 { + return 0 + } + return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0 +} + +// Reset resets all metrics to zero +func (m *ErasureCodingMetrics) Reset() { + m.mutex.Lock() + defer m.mutex.Unlock() + + *m = ErasureCodingMetrics{ + LastEncodingTime: time.Now(), + ShardsPerDataCenter: make(map[string]int64), + ShardsPerRack: make(map[string]int64), + } +} + +// Global metrics instance for erasure coding tasks +var globalErasureCodingMetrics = NewErasureCodingMetrics() + +// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance +func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics { + return globalErasureCodingMetrics +} |
