aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/structs.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-03 01:35:38 -0700
committerGitHub <noreply@github.com>2025-08-03 01:35:38 -0700
commit0ecb466eda3bda7d44ebabb22c4842c9f78589c6 (patch)
tree7e525c4e0d6ea161d82c671deda28900770d5dba /weed/admin/topology/structs.go
parent315fcc70b2121c45d3cc18cb3721e80e78171f8d (diff)
downloadseaweedfs-0ecb466eda3bda7d44ebabb22c4842c9f78589c6.tar.xz
seaweedfs-0ecb466eda3bda7d44ebabb22c4842c9f78589c6.zip
Admin: refactoring active topology (#7073)
* refactoring * add ec shard size * address comments * passing task id There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way. This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature. A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution. * 1 source multiple destinations * task supports multi source and destination * ec needs to clean up previous shards * use erasure coding constants * getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe should return StorageSlotChange for calculation * use CanAccommodate to calculate * remove dead code * address comments * fix Mutex Copying in Protobuf Structs * use constants * fix estimatedSize The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize. * at.assignTaskToDisk(task) * refactoring * Update weed/admin/topology/internal.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fail fast * fix compilation * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * indexes for volume and shard locations * dedup with ToVolumeSlots * return an additional boolean to indicate success, or an error * Update abstract_sql_store.go * fix * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/task_management.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * faster findVolumeDisk * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor * simplify * remove unused GetDiskStorageImpact function * refactor * add comments * Update weed/admin/topology/storage_impact.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update storage_impact.go * AddPendingTask The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/admin/topology/structs.go')
-rw-r--r--weed/admin/topology/structs.go120
1 files changed, 120 insertions, 0 deletions
diff --git a/weed/admin/topology/structs.go b/weed/admin/topology/structs.go
new file mode 100644
index 000000000..f2d29eb5f
--- /dev/null
+++ b/weed/admin/topology/structs.go
@@ -0,0 +1,120 @@
+package topology
+
+import (
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+)
+
+// TaskSource represents a single source in a multi-source task (for replicated volume cleanup)
+type TaskSource struct {
+ SourceServer string `json:"source_server"`
+ SourceDisk uint32 `json:"source_disk"`
+ StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this source
+ EstimatedSize int64 `json:"estimated_size"` // Estimated size for this source
+}
+
+// TaskDestination represents a single destination in a multi-destination task
+type TaskDestination struct {
+ TargetServer string `json:"target_server"`
+ TargetDisk uint32 `json:"target_disk"`
+ StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this destination
+ EstimatedSize int64 `json:"estimated_size"` // Estimated size for this destination
+}
+
+// taskState represents the current state of tasks affecting the topology (internal)
+// Uses unified multi-source/multi-destination design:
+// - Single-source tasks (balance, vacuum, replication): 1 source, 1 destination
+// - Multi-source EC tasks (replicated volumes): N sources, M destinations
+type taskState struct {
+ VolumeID uint32 `json:"volume_id"`
+ TaskType TaskType `json:"task_type"`
+ Status TaskStatus `json:"status"`
+ StartedAt time.Time `json:"started_at"`
+ CompletedAt time.Time `json:"completed_at,omitempty"`
+ EstimatedSize int64 `json:"estimated_size"` // Total estimated size of task
+
+ // Unified source and destination arrays (always used)
+ Sources []TaskSource `json:"sources"` // Source locations (1+ for all task types)
+ Destinations []TaskDestination `json:"destinations"` // Destination locations (1+ for all task types)
+}
+
+// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
+type DiskInfo struct {
+ NodeID string `json:"node_id"`
+ DiskID uint32 `json:"disk_id"`
+ DiskType string `json:"disk_type"`
+ DataCenter string `json:"data_center"`
+ Rack string `json:"rack"`
+ DiskInfo *master_pb.DiskInfo `json:"disk_info"`
+ LoadCount int `json:"load_count"` // Number of active tasks
+}
+
+// activeDisk represents internal disk state (private)
+type activeDisk struct {
+ *DiskInfo
+ pendingTasks []*taskState
+ assignedTasks []*taskState
+ recentTasks []*taskState // Completed in last N seconds
+}
+
+// activeNode represents a node with its disks (private)
+type activeNode struct {
+ nodeID string
+ dataCenter string
+ rack string
+ nodeInfo *master_pb.DataNodeInfo
+ disks map[uint32]*activeDisk // DiskID -> activeDisk
+}
+
+// ActiveTopology provides a real-time view of cluster state with task awareness
+type ActiveTopology struct {
+ // Core topology from master
+ topologyInfo *master_pb.TopologyInfo
+ lastUpdated time.Time
+
+ // Structured topology for easy access (private)
+ nodes map[string]*activeNode // NodeID -> activeNode
+ disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
+
+ // Performance indexes for O(1) lookups (private)
+ volumeIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where volume replicas exist
+ ecShardIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where EC shards exist
+
+ // Task states affecting the topology (private)
+ pendingTasks map[string]*taskState
+ assignedTasks map[string]*taskState
+ recentTasks map[string]*taskState
+
+ // Configuration
+ recentTaskWindowSeconds int
+
+ // Synchronization
+ mutex sync.RWMutex
+}
+
+// DestinationPlan represents a planned destination for a volume/shard operation
+type DestinationPlan struct {
+ TargetNode string `json:"target_node"`
+ TargetDisk uint32 `json:"target_disk"`
+ TargetRack string `json:"target_rack"`
+ TargetDC string `json:"target_dc"`
+ ExpectedSize uint64 `json:"expected_size"`
+ PlacementScore float64 `json:"placement_score"`
+ Conflicts []string `json:"conflicts"`
+}
+
+// MultiDestinationPlan represents multiple planned destinations for operations like EC
+type MultiDestinationPlan struct {
+ Plans []*DestinationPlan `json:"plans"`
+ TotalShards int `json:"total_shards"`
+ SuccessfulRack int `json:"successful_racks"`
+ SuccessfulDCs int `json:"successful_dcs"`
+}
+
+// VolumeReplica represents a replica location with server and disk information
+type VolumeReplica struct {
+ ServerID string `json:"server_id"`
+ DiskID uint32 `json:"disk_id"`
+}