diff options
Diffstat (limited to 'weed/admin/maintenance/pending_operations.go')
| -rw-r--r-- | weed/admin/maintenance/pending_operations.go | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/weed/admin/maintenance/pending_operations.go b/weed/admin/maintenance/pending_operations.go new file mode 100644 index 000000000..16130b4c9 --- /dev/null +++ b/weed/admin/maintenance/pending_operations.go @@ -0,0 +1,311 @@ +package maintenance + +import ( + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// PendingOperationType represents the type of pending operation +type PendingOperationType string + +const ( + OpTypeVolumeMove PendingOperationType = "volume_move" + OpTypeVolumeBalance PendingOperationType = "volume_balance" + OpTypeErasureCoding PendingOperationType = "erasure_coding" + OpTypeVacuum PendingOperationType = "vacuum" + OpTypeReplication PendingOperationType = "replication" +) + +// PendingOperation represents a pending volume/shard operation +type PendingOperation struct { + VolumeID uint32 `json:"volume_id"` + OperationType PendingOperationType `json:"operation_type"` + SourceNode string `json:"source_node"` + DestNode string `json:"dest_node,omitempty"` // Empty for non-movement operations + TaskID string `json:"task_id"` + StartTime time.Time `json:"start_time"` + EstimatedSize uint64 `json:"estimated_size"` // Bytes + Collection string `json:"collection"` + Status string `json:"status"` // "assigned", "in_progress", "completing" +} + +// PendingOperations tracks all pending volume/shard operations +type PendingOperations struct { + // Operations by volume ID for conflict detection + byVolumeID map[uint32]*PendingOperation + + // Operations by task ID for updates + byTaskID map[string]*PendingOperation + + // Operations by node for capacity calculations + bySourceNode map[string][]*PendingOperation + byDestNode map[string][]*PendingOperation + + mutex sync.RWMutex +} + +// NewPendingOperations creates a new pending operations tracker +func NewPendingOperations() *PendingOperations { + return &PendingOperations{ + byVolumeID: make(map[uint32]*PendingOperation), + byTaskID: make(map[string]*PendingOperation), + bySourceNode: make(map[string][]*PendingOperation), + byDestNode: make(map[string][]*PendingOperation), + } +} + +// AddOperation adds a pending operation +func (po *PendingOperations) AddOperation(op *PendingOperation) { + po.mutex.Lock() + defer po.mutex.Unlock() + + // Check for existing operation on this volume + if existing, exists := po.byVolumeID[op.VolumeID]; exists { + glog.V(1).Infof("Replacing existing pending operation on volume %d: %s -> %s", + op.VolumeID, existing.TaskID, op.TaskID) + po.removeOperationUnlocked(existing) + } + + // Add new operation + po.byVolumeID[op.VolumeID] = op + po.byTaskID[op.TaskID] = op + + // Add to node indexes + po.bySourceNode[op.SourceNode] = append(po.bySourceNode[op.SourceNode], op) + if op.DestNode != "" { + po.byDestNode[op.DestNode] = append(po.byDestNode[op.DestNode], op) + } + + glog.V(2).Infof("Added pending operation: volume %d, type %s, task %s, %s -> %s", + op.VolumeID, op.OperationType, op.TaskID, op.SourceNode, op.DestNode) +} + +// RemoveOperation removes a completed operation +func (po *PendingOperations) RemoveOperation(taskID string) { + po.mutex.Lock() + defer po.mutex.Unlock() + + if op, exists := po.byTaskID[taskID]; exists { + po.removeOperationUnlocked(op) + glog.V(2).Infof("Removed completed operation: volume %d, task %s", op.VolumeID, taskID) + } +} + +// removeOperationUnlocked removes an operation (must hold lock) +func (po *PendingOperations) removeOperationUnlocked(op *PendingOperation) { + delete(po.byVolumeID, op.VolumeID) + delete(po.byTaskID, op.TaskID) + + // Remove from source node list + if ops, exists := po.bySourceNode[op.SourceNode]; exists { + for i, other := range ops { + if other.TaskID == op.TaskID { + po.bySourceNode[op.SourceNode] = append(ops[:i], ops[i+1:]...) + break + } + } + } + + // Remove from dest node list + if op.DestNode != "" { + if ops, exists := po.byDestNode[op.DestNode]; exists { + for i, other := range ops { + if other.TaskID == op.TaskID { + po.byDestNode[op.DestNode] = append(ops[:i], ops[i+1:]...) + break + } + } + } + } +} + +// HasPendingOperationOnVolume checks if a volume has a pending operation +func (po *PendingOperations) HasPendingOperationOnVolume(volumeID uint32) bool { + po.mutex.RLock() + defer po.mutex.RUnlock() + + _, exists := po.byVolumeID[volumeID] + return exists +} + +// GetPendingOperationOnVolume returns the pending operation on a volume +func (po *PendingOperations) GetPendingOperationOnVolume(volumeID uint32) *PendingOperation { + po.mutex.RLock() + defer po.mutex.RUnlock() + + return po.byVolumeID[volumeID] +} + +// WouldConflictWithPending checks if a new operation would conflict with pending ones +func (po *PendingOperations) WouldConflictWithPending(volumeID uint32, opType PendingOperationType) bool { + po.mutex.RLock() + defer po.mutex.RUnlock() + + if existing, exists := po.byVolumeID[volumeID]; exists { + // Volume already has a pending operation + glog.V(3).Infof("Volume %d conflict: already has %s operation (task %s)", + volumeID, existing.OperationType, existing.TaskID) + return true + } + + return false +} + +// GetPendingCapacityImpactForNode calculates pending capacity changes for a node +func (po *PendingOperations) GetPendingCapacityImpactForNode(nodeID string) (incoming uint64, outgoing uint64) { + po.mutex.RLock() + defer po.mutex.RUnlock() + + // Calculate outgoing capacity (volumes leaving this node) + if ops, exists := po.bySourceNode[nodeID]; exists { + for _, op := range ops { + // Only count movement operations + if op.DestNode != "" { + outgoing += op.EstimatedSize + } + } + } + + // Calculate incoming capacity (volumes coming to this node) + if ops, exists := po.byDestNode[nodeID]; exists { + for _, op := range ops { + incoming += op.EstimatedSize + } + } + + return incoming, outgoing +} + +// FilterVolumeMetricsExcludingPending filters out volumes with pending operations +func (po *PendingOperations) FilterVolumeMetricsExcludingPending(metrics []*types.VolumeHealthMetrics) []*types.VolumeHealthMetrics { + po.mutex.RLock() + defer po.mutex.RUnlock() + + var filtered []*types.VolumeHealthMetrics + excludedCount := 0 + + for _, metric := range metrics { + if _, hasPending := po.byVolumeID[metric.VolumeID]; !hasPending { + filtered = append(filtered, metric) + } else { + excludedCount++ + glog.V(3).Infof("Excluding volume %d from scan due to pending operation", metric.VolumeID) + } + } + + if excludedCount > 0 { + glog.V(1).Infof("Filtered out %d volumes with pending operations from %d total volumes", + excludedCount, len(metrics)) + } + + return filtered +} + +// GetNodeCapacityProjection calculates projected capacity for a node +func (po *PendingOperations) GetNodeCapacityProjection(nodeID string, currentUsed uint64, totalCapacity uint64) NodeCapacityProjection { + incoming, outgoing := po.GetPendingCapacityImpactForNode(nodeID) + + projectedUsed := currentUsed + incoming - outgoing + projectedFree := totalCapacity - projectedUsed + + return NodeCapacityProjection{ + NodeID: nodeID, + CurrentUsed: currentUsed, + TotalCapacity: totalCapacity, + PendingIncoming: incoming, + PendingOutgoing: outgoing, + ProjectedUsed: projectedUsed, + ProjectedFree: projectedFree, + } +} + +// GetAllPendingOperations returns all pending operations +func (po *PendingOperations) GetAllPendingOperations() []*PendingOperation { + po.mutex.RLock() + defer po.mutex.RUnlock() + + var operations []*PendingOperation + for _, op := range po.byVolumeID { + operations = append(operations, op) + } + + return operations +} + +// UpdateOperationStatus updates the status of a pending operation +func (po *PendingOperations) UpdateOperationStatus(taskID string, status string) { + po.mutex.Lock() + defer po.mutex.Unlock() + + if op, exists := po.byTaskID[taskID]; exists { + op.Status = status + glog.V(3).Infof("Updated operation status: task %s, volume %d -> %s", taskID, op.VolumeID, status) + } +} + +// CleanupStaleOperations removes operations that have been running too long +func (po *PendingOperations) CleanupStaleOperations(maxAge time.Duration) int { + po.mutex.Lock() + defer po.mutex.Unlock() + + cutoff := time.Now().Add(-maxAge) + var staleOps []*PendingOperation + + for _, op := range po.byVolumeID { + if op.StartTime.Before(cutoff) { + staleOps = append(staleOps, op) + } + } + + for _, op := range staleOps { + po.removeOperationUnlocked(op) + glog.Warningf("Removed stale pending operation: volume %d, task %s, age %v", + op.VolumeID, op.TaskID, time.Since(op.StartTime)) + } + + return len(staleOps) +} + +// NodeCapacityProjection represents projected capacity for a node +type NodeCapacityProjection struct { + NodeID string `json:"node_id"` + CurrentUsed uint64 `json:"current_used"` + TotalCapacity uint64 `json:"total_capacity"` + PendingIncoming uint64 `json:"pending_incoming"` + PendingOutgoing uint64 `json:"pending_outgoing"` + ProjectedUsed uint64 `json:"projected_used"` + ProjectedFree uint64 `json:"projected_free"` +} + +// GetStats returns statistics about pending operations +func (po *PendingOperations) GetStats() PendingOperationsStats { + po.mutex.RLock() + defer po.mutex.RUnlock() + + stats := PendingOperationsStats{ + TotalOperations: len(po.byVolumeID), + ByType: make(map[PendingOperationType]int), + ByStatus: make(map[string]int), + } + + var totalSize uint64 + for _, op := range po.byVolumeID { + stats.ByType[op.OperationType]++ + stats.ByStatus[op.Status]++ + totalSize += op.EstimatedSize + } + + stats.TotalEstimatedSize = totalSize + return stats +} + +// PendingOperationsStats provides statistics about pending operations +type PendingOperationsStats struct { + TotalOperations int `json:"total_operations"` + ByType map[PendingOperationType]int `json:"by_type"` + ByStatus map[string]int `json:"by_status"` + TotalEstimatedSize uint64 `json:"total_estimated_size"` +} |
