diff options
Diffstat (limited to 'weed/admin/topology/active_topology.go')
| -rw-r--r-- | weed/admin/topology/active_topology.go | 425 |
1 files changed, 2 insertions, 423 deletions
diff --git a/weed/admin/topology/active_topology.go b/weed/admin/topology/active_topology.go index bfa03a72f..e4ef5c5d0 100644 --- a/weed/admin/topology/active_topology.go +++ b/weed/admin/topology/active_topology.go @@ -1,98 +1,5 @@ package topology -import ( - "fmt" - "sync" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" -) - -// TaskType represents different types of maintenance operations -type TaskType string - -// TaskStatus represents the current status of a task -type TaskStatus string - -// Common task type constants -const ( - TaskTypeVacuum TaskType = "vacuum" - TaskTypeBalance TaskType = "balance" - TaskTypeErasureCoding TaskType = "erasure_coding" - TaskTypeReplication TaskType = "replication" -) - -// Common task status constants -const ( - TaskStatusPending TaskStatus = "pending" - TaskStatusInProgress TaskStatus = "in_progress" - TaskStatusCompleted TaskStatus = "completed" -) - -// taskState represents the current state of tasks affecting the topology (internal) -type taskState struct { - VolumeID uint32 `json:"volume_id"` - TaskType TaskType `json:"task_type"` - SourceServer string `json:"source_server"` - SourceDisk uint32 `json:"source_disk"` - TargetServer string `json:"target_server,omitempty"` - TargetDisk uint32 `json:"target_disk,omitempty"` - Status TaskStatus `json:"status"` - StartedAt time.Time `json:"started_at"` - CompletedAt time.Time `json:"completed_at,omitempty"` -} - -// DiskInfo represents a disk with its current state and ongoing tasks (public for external access) -type DiskInfo struct { - NodeID string `json:"node_id"` - DiskID uint32 `json:"disk_id"` - DiskType string `json:"disk_type"` - DataCenter string `json:"data_center"` - Rack string `json:"rack"` - DiskInfo *master_pb.DiskInfo `json:"disk_info"` - LoadCount int `json:"load_count"` // Number of active tasks -} - -// activeDisk represents internal disk state (private) -type activeDisk struct { - *DiskInfo - pendingTasks []*taskState - assignedTasks []*taskState - recentTasks []*taskState // Completed in last N seconds -} - -// activeNode represents a node with its disks (private) -type activeNode struct { - nodeID string - dataCenter string - rack string - nodeInfo *master_pb.DataNodeInfo - disks map[uint32]*activeDisk // DiskID -> activeDisk -} - -// ActiveTopology provides a real-time view of cluster state with task awareness -type ActiveTopology struct { - // Core topology from master - topologyInfo *master_pb.TopologyInfo - lastUpdated time.Time - - // Structured topology for easy access (private) - nodes map[string]*activeNode // NodeID -> activeNode - disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk - - // Task states affecting the topology (private) - pendingTasks map[string]*taskState - assignedTasks map[string]*taskState - recentTasks map[string]*taskState - - // Configuration - recentTaskWindowSeconds int - - // Synchronization - mutex sync.RWMutex -} - // NewActiveTopology creates a new ActiveTopology instance func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology { if recentTaskWindowSeconds <= 0 { @@ -102,339 +9,11 @@ func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology { return &ActiveTopology{ nodes: make(map[string]*activeNode), disks: make(map[string]*activeDisk), + volumeIndex: make(map[uint32][]string), + ecShardIndex: make(map[uint32][]string), pendingTasks: make(map[string]*taskState), assignedTasks: make(map[string]*taskState), recentTasks: make(map[string]*taskState), recentTaskWindowSeconds: recentTaskWindowSeconds, } } - -// UpdateTopology updates the topology information from master -func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error { - at.mutex.Lock() - defer at.mutex.Unlock() - - at.topologyInfo = topologyInfo - at.lastUpdated = time.Now() - - // Rebuild structured topology - at.nodes = make(map[string]*activeNode) - at.disks = make(map[string]*activeDisk) - - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, nodeInfo := range rack.DataNodeInfos { - node := &activeNode{ - nodeID: nodeInfo.Id, - dataCenter: dc.Id, - rack: rack.Id, - nodeInfo: nodeInfo, - disks: make(map[uint32]*activeDisk), - } - - // Add disks for this node - for diskType, diskInfo := range nodeInfo.DiskInfos { - disk := &activeDisk{ - DiskInfo: &DiskInfo{ - NodeID: nodeInfo.Id, - DiskID: diskInfo.DiskId, - DiskType: diskType, - DataCenter: dc.Id, - Rack: rack.Id, - DiskInfo: diskInfo, - }, - } - - diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) - node.disks[diskInfo.DiskId] = disk - at.disks[diskKey] = disk - } - - at.nodes[nodeInfo.Id] = node - } - } - } - - // Reassign task states to updated topology - at.reassignTaskStates() - - glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks", len(at.nodes), len(at.disks)) - return nil -} - -// AddPendingTask adds a pending task to the topology -func (at *ActiveTopology) AddPendingTask(taskID string, taskType TaskType, volumeID uint32, - sourceServer string, sourceDisk uint32, targetServer string, targetDisk uint32) { - at.mutex.Lock() - defer at.mutex.Unlock() - - task := &taskState{ - VolumeID: volumeID, - TaskType: taskType, - SourceServer: sourceServer, - SourceDisk: sourceDisk, - TargetServer: targetServer, - TargetDisk: targetDisk, - Status: TaskStatusPending, - StartedAt: time.Now(), - } - - at.pendingTasks[taskID] = task - at.assignTaskToDisk(task) -} - -// AssignTask moves a task from pending to assigned -func (at *ActiveTopology) AssignTask(taskID string) error { - at.mutex.Lock() - defer at.mutex.Unlock() - - task, exists := at.pendingTasks[taskID] - if !exists { - return fmt.Errorf("pending task %s not found", taskID) - } - - delete(at.pendingTasks, taskID) - task.Status = TaskStatusInProgress - at.assignedTasks[taskID] = task - at.reassignTaskStates() - - return nil -} - -// CompleteTask moves a task from assigned to recent -func (at *ActiveTopology) CompleteTask(taskID string) error { - at.mutex.Lock() - defer at.mutex.Unlock() - - task, exists := at.assignedTasks[taskID] - if !exists { - return fmt.Errorf("assigned task %s not found", taskID) - } - - delete(at.assignedTasks, taskID) - task.Status = TaskStatusCompleted - task.CompletedAt = time.Now() - at.recentTasks[taskID] = task - at.reassignTaskStates() - - // Clean up old recent tasks - at.cleanupRecentTasks() - - return nil -} - -// GetAvailableDisks returns disks that can accept new tasks of the given type -func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - - var available []*DiskInfo - - for _, disk := range at.disks { - if disk.NodeID == excludeNodeID { - continue // Skip excluded node - } - - if at.isDiskAvailable(disk, taskType) { - // Create a copy with current load count - diskCopy := *disk.DiskInfo - diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) - available = append(available, &diskCopy) - } - } - - return available -} - -// GetDiskLoad returns the current load on a disk (number of active tasks) -func (at *ActiveTopology) GetDiskLoad(nodeID string, diskID uint32) int { - at.mutex.RLock() - defer at.mutex.RUnlock() - - diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) - disk, exists := at.disks[diskKey] - if !exists { - return 0 - } - - return len(disk.pendingTasks) + len(disk.assignedTasks) -} - -// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection) -func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool { - at.mutex.RLock() - defer at.mutex.RUnlock() - - for _, task := range at.recentTasks { - if task.VolumeID == volumeID && task.TaskType == taskType { - return true - } - } - - return false -} - -// GetAllNodes returns information about all nodes (public interface) -func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - - result := make(map[string]*master_pb.DataNodeInfo) - for nodeID, node := range at.nodes { - result[nodeID] = node.nodeInfo - } - return result -} - -// GetTopologyInfo returns the current topology information (read-only access) -func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - return at.topologyInfo -} - -// GetNodeDisks returns all disks for a specific node -func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo { - at.mutex.RLock() - defer at.mutex.RUnlock() - - node, exists := at.nodes[nodeID] - if !exists { - return nil - } - - var disks []*DiskInfo - for _, disk := range node.disks { - diskCopy := *disk.DiskInfo - diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) - disks = append(disks, &diskCopy) - } - - return disks -} - -// DestinationPlan represents a planned destination for a volume/shard operation -type DestinationPlan struct { - TargetNode string `json:"target_node"` - TargetDisk uint32 `json:"target_disk"` - TargetRack string `json:"target_rack"` - TargetDC string `json:"target_dc"` - ExpectedSize uint64 `json:"expected_size"` - PlacementScore float64 `json:"placement_score"` - Conflicts []string `json:"conflicts"` -} - -// MultiDestinationPlan represents multiple planned destinations for operations like EC -type MultiDestinationPlan struct { - Plans []*DestinationPlan `json:"plans"` - TotalShards int `json:"total_shards"` - SuccessfulRack int `json:"successful_racks"` - SuccessfulDCs int `json:"successful_dcs"` -} - -// Private methods - -// reassignTaskStates assigns tasks to the appropriate disks -func (at *ActiveTopology) reassignTaskStates() { - // Clear existing task assignments - for _, disk := range at.disks { - disk.pendingTasks = nil - disk.assignedTasks = nil - disk.recentTasks = nil - } - - // Reassign pending tasks - for _, task := range at.pendingTasks { - at.assignTaskToDisk(task) - } - - // Reassign assigned tasks - for _, task := range at.assignedTasks { - at.assignTaskToDisk(task) - } - - // Reassign recent tasks - for _, task := range at.recentTasks { - at.assignTaskToDisk(task) - } -} - -// assignTaskToDisk assigns a task to the appropriate disk(s) -func (at *ActiveTopology) assignTaskToDisk(task *taskState) { - // Assign to source disk - sourceKey := fmt.Sprintf("%s:%d", task.SourceServer, task.SourceDisk) - if sourceDisk, exists := at.disks[sourceKey]; exists { - switch task.Status { - case TaskStatusPending: - sourceDisk.pendingTasks = append(sourceDisk.pendingTasks, task) - case TaskStatusInProgress: - sourceDisk.assignedTasks = append(sourceDisk.assignedTasks, task) - case TaskStatusCompleted: - sourceDisk.recentTasks = append(sourceDisk.recentTasks, task) - } - } - - // Assign to target disk if it exists and is different from source - if task.TargetServer != "" && (task.TargetServer != task.SourceServer || task.TargetDisk != task.SourceDisk) { - targetKey := fmt.Sprintf("%s:%d", task.TargetServer, task.TargetDisk) - if targetDisk, exists := at.disks[targetKey]; exists { - switch task.Status { - case TaskStatusPending: - targetDisk.pendingTasks = append(targetDisk.pendingTasks, task) - case TaskStatusInProgress: - targetDisk.assignedTasks = append(targetDisk.assignedTasks, task) - case TaskStatusCompleted: - targetDisk.recentTasks = append(targetDisk.recentTasks, task) - } - } - } -} - -// isDiskAvailable checks if a disk can accept new tasks -func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool { - // Check if disk has too many active tasks - activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) - if activeLoad >= 2 { // Max 2 concurrent tasks per disk - return false - } - - // Check for conflicting task types - for _, task := range disk.assignedTasks { - if at.areTaskTypesConflicting(task.TaskType, taskType) { - return false - } - } - - return true -} - -// areTaskTypesConflicting checks if two task types conflict -func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool { - // Examples of conflicting task types - conflictMap := map[TaskType][]TaskType{ - TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding}, - TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding}, - TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance}, - } - - if conflicts, exists := conflictMap[existing]; exists { - for _, conflictType := range conflicts { - if conflictType == new { - return true - } - } - } - - return false -} - -// cleanupRecentTasks removes old recent tasks -func (at *ActiveTopology) cleanupRecentTasks() { - cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second) - - for taskID, task := range at.recentTasks { - if task.CompletedAt.Before(cutoff) { - delete(at.recentTasks, taskID) - } - } -} |
