diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/balance/monitoring.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/balance/monitoring.go')
| -rw-r--r-- | weed/worker/tasks/balance/monitoring.go | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/weed/worker/tasks/balance/monitoring.go b/weed/worker/tasks/balance/monitoring.go new file mode 100644 index 000000000..517de2484 --- /dev/null +++ b/weed/worker/tasks/balance/monitoring.go @@ -0,0 +1,138 @@ +package balance + +import ( + "sync" + "time" +) + +// BalanceMetrics contains balance-specific monitoring data +type BalanceMetrics struct { + // Execution metrics + VolumesBalanced int64 `json:"volumes_balanced"` + TotalDataTransferred int64 `json:"total_data_transferred"` + AverageImbalance float64 `json:"average_imbalance"` + LastBalanceTime time.Time `json:"last_balance_time"` + + // Performance metrics + AverageTransferSpeed float64 `json:"average_transfer_speed_mbps"` + TotalExecutionTime int64 `json:"total_execution_time_seconds"` + SuccessfulOperations int64 `json:"successful_operations"` + FailedOperations int64 `json:"failed_operations"` + + // Current task metrics + CurrentImbalanceScore float64 `json:"current_imbalance_score"` + PlannedDestinations int `json:"planned_destinations"` + + mutex sync.RWMutex +} + +// NewBalanceMetrics creates a new balance metrics instance +func NewBalanceMetrics() *BalanceMetrics { + return &BalanceMetrics{ + LastBalanceTime: time.Now(), + } +} + +// RecordVolumeBalanced records a successful volume balance operation +func (m *BalanceMetrics) RecordVolumeBalanced(volumeSize int64, transferTime time.Duration) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.VolumesBalanced++ + m.TotalDataTransferred += volumeSize + m.SuccessfulOperations++ + m.LastBalanceTime = time.Now() + m.TotalExecutionTime += int64(transferTime.Seconds()) + + // Calculate average transfer speed (MB/s) + if transferTime > 0 { + speedMBps := float64(volumeSize) / (1024 * 1024) / transferTime.Seconds() + if m.AverageTransferSpeed == 0 { + m.AverageTransferSpeed = speedMBps + } else { + // Exponential moving average + m.AverageTransferSpeed = 0.8*m.AverageTransferSpeed + 0.2*speedMBps + } + } +} + +// RecordFailure records a failed balance operation +func (m *BalanceMetrics) RecordFailure() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.FailedOperations++ +} + +// UpdateImbalanceScore updates the current cluster imbalance score +func (m *BalanceMetrics) UpdateImbalanceScore(score float64) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.CurrentImbalanceScore = score + + // Update average imbalance with exponential moving average + if m.AverageImbalance == 0 { + m.AverageImbalance = score + } else { + m.AverageImbalance = 0.9*m.AverageImbalance + 0.1*score + } +} + +// SetPlannedDestinations sets the number of planned destinations +func (m *BalanceMetrics) SetPlannedDestinations(count int) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.PlannedDestinations = count +} + +// GetMetrics returns a copy of the current metrics (without the mutex) +func (m *BalanceMetrics) GetMetrics() BalanceMetrics { + m.mutex.RLock() + defer m.mutex.RUnlock() + + // Create a copy without the mutex to avoid copying lock value + return BalanceMetrics{ + VolumesBalanced: m.VolumesBalanced, + TotalDataTransferred: m.TotalDataTransferred, + AverageImbalance: m.AverageImbalance, + LastBalanceTime: m.LastBalanceTime, + AverageTransferSpeed: m.AverageTransferSpeed, + TotalExecutionTime: m.TotalExecutionTime, + SuccessfulOperations: m.SuccessfulOperations, + FailedOperations: m.FailedOperations, + CurrentImbalanceScore: m.CurrentImbalanceScore, + PlannedDestinations: m.PlannedDestinations, + } +} + +// GetSuccessRate returns the success rate as a percentage +func (m *BalanceMetrics) GetSuccessRate() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + total := m.SuccessfulOperations + m.FailedOperations + if total == 0 { + return 100.0 + } + return float64(m.SuccessfulOperations) / float64(total) * 100.0 +} + +// Reset resets all metrics to zero +func (m *BalanceMetrics) Reset() { + m.mutex.Lock() + defer m.mutex.Unlock() + + *m = BalanceMetrics{ + LastBalanceTime: time.Now(), + } +} + +// Global metrics instance for balance tasks +var globalBalanceMetrics = NewBalanceMetrics() + +// GetGlobalBalanceMetrics returns the global balance metrics instance +func GetGlobalBalanceMetrics() *BalanceMetrics { + return globalBalanceMetrics +} |
