aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/active_topology.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/topology/active_topology.go')
-rw-r--r--weed/admin/topology/active_topology.go425
1 files changed, 2 insertions, 423 deletions
diff --git a/weed/admin/topology/active_topology.go b/weed/admin/topology/active_topology.go
index bfa03a72f..e4ef5c5d0 100644
--- a/weed/admin/topology/active_topology.go
+++ b/weed/admin/topology/active_topology.go
@@ -1,98 +1,5 @@
package topology
-import (
- "fmt"
- "sync"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
-)
-
-// TaskType represents different types of maintenance operations
-type TaskType string
-
-// TaskStatus represents the current status of a task
-type TaskStatus string
-
-// Common task type constants
-const (
- TaskTypeVacuum TaskType = "vacuum"
- TaskTypeBalance TaskType = "balance"
- TaskTypeErasureCoding TaskType = "erasure_coding"
- TaskTypeReplication TaskType = "replication"
-)
-
-// Common task status constants
-const (
- TaskStatusPending TaskStatus = "pending"
- TaskStatusInProgress TaskStatus = "in_progress"
- TaskStatusCompleted TaskStatus = "completed"
-)
-
-// taskState represents the current state of tasks affecting the topology (internal)
-type taskState struct {
- VolumeID uint32 `json:"volume_id"`
- TaskType TaskType `json:"task_type"`
- SourceServer string `json:"source_server"`
- SourceDisk uint32 `json:"source_disk"`
- TargetServer string `json:"target_server,omitempty"`
- TargetDisk uint32 `json:"target_disk,omitempty"`
- Status TaskStatus `json:"status"`
- StartedAt time.Time `json:"started_at"`
- CompletedAt time.Time `json:"completed_at,omitempty"`
-}
-
-// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
-type DiskInfo struct {
- NodeID string `json:"node_id"`
- DiskID uint32 `json:"disk_id"`
- DiskType string `json:"disk_type"`
- DataCenter string `json:"data_center"`
- Rack string `json:"rack"`
- DiskInfo *master_pb.DiskInfo `json:"disk_info"`
- LoadCount int `json:"load_count"` // Number of active tasks
-}
-
-// activeDisk represents internal disk state (private)
-type activeDisk struct {
- *DiskInfo
- pendingTasks []*taskState
- assignedTasks []*taskState
- recentTasks []*taskState // Completed in last N seconds
-}
-
-// activeNode represents a node with its disks (private)
-type activeNode struct {
- nodeID string
- dataCenter string
- rack string
- nodeInfo *master_pb.DataNodeInfo
- disks map[uint32]*activeDisk // DiskID -> activeDisk
-}
-
-// ActiveTopology provides a real-time view of cluster state with task awareness
-type ActiveTopology struct {
- // Core topology from master
- topologyInfo *master_pb.TopologyInfo
- lastUpdated time.Time
-
- // Structured topology for easy access (private)
- nodes map[string]*activeNode // NodeID -> activeNode
- disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
-
- // Task states affecting the topology (private)
- pendingTasks map[string]*taskState
- assignedTasks map[string]*taskState
- recentTasks map[string]*taskState
-
- // Configuration
- recentTaskWindowSeconds int
-
- // Synchronization
- mutex sync.RWMutex
-}
-
// NewActiveTopology creates a new ActiveTopology instance
func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
if recentTaskWindowSeconds <= 0 {
@@ -102,339 +9,11 @@ func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
return &ActiveTopology{
nodes: make(map[string]*activeNode),
disks: make(map[string]*activeDisk),
+ volumeIndex: make(map[uint32][]string),
+ ecShardIndex: make(map[uint32][]string),
pendingTasks: make(map[string]*taskState),
assignedTasks: make(map[string]*taskState),
recentTasks: make(map[string]*taskState),
recentTaskWindowSeconds: recentTaskWindowSeconds,
}
}
-
-// UpdateTopology updates the topology information from master
-func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- at.topologyInfo = topologyInfo
- at.lastUpdated = time.Now()
-
- // Rebuild structured topology
- at.nodes = make(map[string]*activeNode)
- at.disks = make(map[string]*activeDisk)
-
- for _, dc := range topologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, nodeInfo := range rack.DataNodeInfos {
- node := &activeNode{
- nodeID: nodeInfo.Id,
- dataCenter: dc.Id,
- rack: rack.Id,
- nodeInfo: nodeInfo,
- disks: make(map[uint32]*activeDisk),
- }
-
- // Add disks for this node
- for diskType, diskInfo := range nodeInfo.DiskInfos {
- disk := &activeDisk{
- DiskInfo: &DiskInfo{
- NodeID: nodeInfo.Id,
- DiskID: diskInfo.DiskId,
- DiskType: diskType,
- DataCenter: dc.Id,
- Rack: rack.Id,
- DiskInfo: diskInfo,
- },
- }
-
- diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
- node.disks[diskInfo.DiskId] = disk
- at.disks[diskKey] = disk
- }
-
- at.nodes[nodeInfo.Id] = node
- }
- }
- }
-
- // Reassign task states to updated topology
- at.reassignTaskStates()
-
- glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks", len(at.nodes), len(at.disks))
- return nil
-}
-
-// AddPendingTask adds a pending task to the topology
-func (at *ActiveTopology) AddPendingTask(taskID string, taskType TaskType, volumeID uint32,
- sourceServer string, sourceDisk uint32, targetServer string, targetDisk uint32) {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- task := &taskState{
- VolumeID: volumeID,
- TaskType: taskType,
- SourceServer: sourceServer,
- SourceDisk: sourceDisk,
- TargetServer: targetServer,
- TargetDisk: targetDisk,
- Status: TaskStatusPending,
- StartedAt: time.Now(),
- }
-
- at.pendingTasks[taskID] = task
- at.assignTaskToDisk(task)
-}
-
-// AssignTask moves a task from pending to assigned
-func (at *ActiveTopology) AssignTask(taskID string) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- task, exists := at.pendingTasks[taskID]
- if !exists {
- return fmt.Errorf("pending task %s not found", taskID)
- }
-
- delete(at.pendingTasks, taskID)
- task.Status = TaskStatusInProgress
- at.assignedTasks[taskID] = task
- at.reassignTaskStates()
-
- return nil
-}
-
-// CompleteTask moves a task from assigned to recent
-func (at *ActiveTopology) CompleteTask(taskID string) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- task, exists := at.assignedTasks[taskID]
- if !exists {
- return fmt.Errorf("assigned task %s not found", taskID)
- }
-
- delete(at.assignedTasks, taskID)
- task.Status = TaskStatusCompleted
- task.CompletedAt = time.Now()
- at.recentTasks[taskID] = task
- at.reassignTaskStates()
-
- // Clean up old recent tasks
- at.cleanupRecentTasks()
-
- return nil
-}
-
-// GetAvailableDisks returns disks that can accept new tasks of the given type
-func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- var available []*DiskInfo
-
- for _, disk := range at.disks {
- if disk.NodeID == excludeNodeID {
- continue // Skip excluded node
- }
-
- if at.isDiskAvailable(disk, taskType) {
- // Create a copy with current load count
- diskCopy := *disk.DiskInfo
- diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
- available = append(available, &diskCopy)
- }
- }
-
- return available
-}
-
-// GetDiskLoad returns the current load on a disk (number of active tasks)
-func (at *ActiveTopology) GetDiskLoad(nodeID string, diskID uint32) int {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
- disk, exists := at.disks[diskKey]
- if !exists {
- return 0
- }
-
- return len(disk.pendingTasks) + len(disk.assignedTasks)
-}
-
-// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
-func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- for _, task := range at.recentTasks {
- if task.VolumeID == volumeID && task.TaskType == taskType {
- return true
- }
- }
-
- return false
-}
-
-// GetAllNodes returns information about all nodes (public interface)
-func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- result := make(map[string]*master_pb.DataNodeInfo)
- for nodeID, node := range at.nodes {
- result[nodeID] = node.nodeInfo
- }
- return result
-}
-
-// GetTopologyInfo returns the current topology information (read-only access)
-func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- return at.topologyInfo
-}
-
-// GetNodeDisks returns all disks for a specific node
-func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- node, exists := at.nodes[nodeID]
- if !exists {
- return nil
- }
-
- var disks []*DiskInfo
- for _, disk := range node.disks {
- diskCopy := *disk.DiskInfo
- diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
- disks = append(disks, &diskCopy)
- }
-
- return disks
-}
-
-// DestinationPlan represents a planned destination for a volume/shard operation
-type DestinationPlan struct {
- TargetNode string `json:"target_node"`
- TargetDisk uint32 `json:"target_disk"`
- TargetRack string `json:"target_rack"`
- TargetDC string `json:"target_dc"`
- ExpectedSize uint64 `json:"expected_size"`
- PlacementScore float64 `json:"placement_score"`
- Conflicts []string `json:"conflicts"`
-}
-
-// MultiDestinationPlan represents multiple planned destinations for operations like EC
-type MultiDestinationPlan struct {
- Plans []*DestinationPlan `json:"plans"`
- TotalShards int `json:"total_shards"`
- SuccessfulRack int `json:"successful_racks"`
- SuccessfulDCs int `json:"successful_dcs"`
-}
-
-// Private methods
-
-// reassignTaskStates assigns tasks to the appropriate disks
-func (at *ActiveTopology) reassignTaskStates() {
- // Clear existing task assignments
- for _, disk := range at.disks {
- disk.pendingTasks = nil
- disk.assignedTasks = nil
- disk.recentTasks = nil
- }
-
- // Reassign pending tasks
- for _, task := range at.pendingTasks {
- at.assignTaskToDisk(task)
- }
-
- // Reassign assigned tasks
- for _, task := range at.assignedTasks {
- at.assignTaskToDisk(task)
- }
-
- // Reassign recent tasks
- for _, task := range at.recentTasks {
- at.assignTaskToDisk(task)
- }
-}
-
-// assignTaskToDisk assigns a task to the appropriate disk(s)
-func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
- // Assign to source disk
- sourceKey := fmt.Sprintf("%s:%d", task.SourceServer, task.SourceDisk)
- if sourceDisk, exists := at.disks[sourceKey]; exists {
- switch task.Status {
- case TaskStatusPending:
- sourceDisk.pendingTasks = append(sourceDisk.pendingTasks, task)
- case TaskStatusInProgress:
- sourceDisk.assignedTasks = append(sourceDisk.assignedTasks, task)
- case TaskStatusCompleted:
- sourceDisk.recentTasks = append(sourceDisk.recentTasks, task)
- }
- }
-
- // Assign to target disk if it exists and is different from source
- if task.TargetServer != "" && (task.TargetServer != task.SourceServer || task.TargetDisk != task.SourceDisk) {
- targetKey := fmt.Sprintf("%s:%d", task.TargetServer, task.TargetDisk)
- if targetDisk, exists := at.disks[targetKey]; exists {
- switch task.Status {
- case TaskStatusPending:
- targetDisk.pendingTasks = append(targetDisk.pendingTasks, task)
- case TaskStatusInProgress:
- targetDisk.assignedTasks = append(targetDisk.assignedTasks, task)
- case TaskStatusCompleted:
- targetDisk.recentTasks = append(targetDisk.recentTasks, task)
- }
- }
- }
-}
-
-// isDiskAvailable checks if a disk can accept new tasks
-func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
- // Check if disk has too many active tasks
- activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- if activeLoad >= 2 { // Max 2 concurrent tasks per disk
- return false
- }
-
- // Check for conflicting task types
- for _, task := range disk.assignedTasks {
- if at.areTaskTypesConflicting(task.TaskType, taskType) {
- return false
- }
- }
-
- return true
-}
-
-// areTaskTypesConflicting checks if two task types conflict
-func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
- // Examples of conflicting task types
- conflictMap := map[TaskType][]TaskType{
- TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
- TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
- TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
- }
-
- if conflicts, exists := conflictMap[existing]; exists {
- for _, conflictType := range conflicts {
- if conflictType == new {
- return true
- }
- }
- }
-
- return false
-}
-
-// cleanupRecentTasks removes old recent tasks
-func (at *ActiveTopology) cleanupRecentTasks() {
- cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
-
- for taskID, task := range at.recentTasks {
- if task.CompletedAt.Before(cutoff) {
- delete(at.recentTasks, taskID)
- }
- }
-}