aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks')
-rw-r--r--weed/worker/tasks/balance/detection.go34
-rw-r--r--weed/worker/tasks/base/volume_utils.go36
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go169
-rw-r--r--weed/worker/tasks/vacuum/detection.go6
4 files changed, 232 insertions, 13 deletions
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go
index 102f532a8..be03fb92f 100644
--- a/weed/worker/tasks/balance/detection.go
+++ b/weed/worker/tasks/balance/detection.go
@@ -83,7 +83,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+ // Generate task ID for ActiveTopology integration
+ taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix())
+
task := &types.TaskDetectionResult{
+ TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeBalance,
VolumeID: selectedVolume.VolumeID,
Server: selectedVolume.Server,
@@ -103,6 +107,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Create typed parameters with destination information
task.TypedParams = &worker_pb.TaskParams{
+ TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: selectedVolume.VolumeID,
Server: selectedVolume.Server,
Collection: selectedVolume.Collection,
@@ -121,6 +126,35 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)",
selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore)
+
+ // Add pending balance task to ActiveTopology for capacity management
+
+ // Find the actual disk containing the volume on the source server
+ sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
+ if !found {
+ return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
+ selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
+ }
+ targetDisk := destinationPlan.TargetDisk
+
+ err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
+ TaskID: taskID,
+ TaskType: topology.TaskTypeBalance,
+ VolumeID: selectedVolume.VolumeID,
+ VolumeSize: int64(selectedVolume.Size),
+ Sources: []topology.TaskSourceSpec{
+ {ServerID: selectedVolume.Server, DiskID: sourceDisk},
+ },
+ Destinations: []topology.TaskDestinationSpec{
+ {ServerID: destinationPlan.TargetNode, DiskID: targetDisk},
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err)
+ }
+
+ glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
+ taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
} else {
glog.Warningf("No ActiveTopology available for destination planning in balance detection")
return nil, nil
diff --git a/weed/worker/tasks/base/volume_utils.go b/weed/worker/tasks/base/volume_utils.go
new file mode 100644
index 000000000..2aaf795b2
--- /dev/null
+++ b/weed/worker/tasks/base/volume_utils.go
@@ -0,0 +1,36 @@
+package base
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/admin/topology"
+)
+
+// FindVolumeDisk finds the disk ID where a specific volume is located on a given server.
+// Returns the disk ID and a boolean indicating whether the volume was found.
+// Uses O(1) indexed lookup for optimal performance on large clusters.
+//
+// This is a shared utility function used by multiple task detection algorithms
+// (balance, vacuum, etc.) to locate volumes efficiently.
+//
+// Example usage:
+//
+// // In balance task: find source disk for a volume that needs to be moved
+// sourceDisk, found := base.FindVolumeDisk(topology, volumeID, collection, sourceServer)
+//
+// // In vacuum task: find disk containing volume that needs cleanup
+// diskID, exists := base.FindVolumeDisk(topology, volumeID, collection, serverID)
+func FindVolumeDisk(activeTopology *topology.ActiveTopology, volumeID uint32, collection string, serverID string) (uint32, bool) {
+ if activeTopology == nil {
+ return 0, false
+ }
+
+ // Use the new O(1) indexed lookup for better performance
+ locations := activeTopology.GetVolumeLocations(volumeID, collection)
+ for _, loc := range locations {
+ if loc.ServerID == serverID {
+ return loc.DiskID, true
+ }
+ }
+
+ // Volume not found on this server
+ return 0, false
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index 9cf87cdf6..ec632436f 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -61,7 +61,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ // Generate task ID for ActiveTopology integration
+ taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
+
result := &types.TaskDetectionResult{
+ TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
Server: metric.Server,
@@ -81,12 +85,117 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
continue // Skip this volume if destination planning fails
}
- // Find all volume replicas from topology
- replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
+ // Calculate expected shard size for EC operation
+ // Each data shard will be approximately volumeSize / dataShards
+ expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
+
+ // Add pending EC shard task to ActiveTopology for capacity management
+
+ // Extract shard destinations from multiPlan
+ var shardDestinations []string
+ var shardDiskIDs []uint32
+ for _, plan := range multiPlan.Plans {
+ shardDestinations = append(shardDestinations, plan.TargetNode)
+ shardDiskIDs = append(shardDiskIDs, plan.TargetDisk)
+ }
+
+ // Find all volume replica locations (server + disk) from topology
+ replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
+ if len(replicaLocations) == 0 {
+ glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
+ continue
+ }
+
+ // Find existing EC shards from previous failed attempts
+ existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
+
+ // Combine volume replicas and existing EC shards for cleanup
+ var allSourceLocations []topology.TaskSourceLocation
+
+ // Add volume replicas (will free volume slots)
+ for _, replica := range replicaLocations {
+ allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ ServerID: replica.ServerID,
+ DiskID: replica.DiskID,
+ CleanupType: topology.CleanupVolumeReplica,
+ })
+ }
+
+ // Add existing EC shards (will free shard slots)
+ duplicateCheck := make(map[string]bool)
+ for _, replica := range replicaLocations {
+ key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID)
+ duplicateCheck[key] = true
+ }
+
+ for _, shard := range existingECShards {
+ key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
+ if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
+ allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ ServerID: shard.ServerID,
+ DiskID: shard.DiskID,
+ CleanupType: topology.CleanupECShards,
+ })
+ duplicateCheck[key] = true
+ }
+ }
+
+ glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
+ len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations))
+
+ // Convert TaskSourceLocation to TaskSourceSpec
+ sources := make([]topology.TaskSourceSpec, len(allSourceLocations))
+ for i, srcLoc := range allSourceLocations {
+ sources[i] = topology.TaskSourceSpec{
+ ServerID: srcLoc.ServerID,
+ DiskID: srcLoc.DiskID,
+ CleanupType: srcLoc.CleanupType,
+ }
+ }
+
+ // Convert shard destinations to TaskDestinationSpec
+ destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
+ shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination
+ shardSize := int64(expectedShardSize)
+ for i, dest := range shardDestinations {
+ destinations[i] = topology.TaskDestinationSpec{
+ ServerID: dest,
+ DiskID: shardDiskIDs[i],
+ StorageImpact: &shardImpact,
+ EstimatedSize: &shardSize,
+ }
+ }
+
+ err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
+ TaskID: taskID,
+ TaskType: topology.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ VolumeSize: int64(metric.Size),
+ Sources: sources,
+ Destinations: destinations,
+ })
+ if err != nil {
+ glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err)
+ continue // Skip this volume if topology task addition fails
+ }
+
+ glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
+ taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans))
+
+ // Find all volume replicas from topology (for legacy worker compatibility)
+ var replicas []string
+ serverSet := make(map[string]struct{})
+ for _, loc := range replicaLocations {
+ if _, found := serverSet[loc.ServerID]; !found {
+ replicas = append(replicas, loc.ServerID)
+ serverSet[loc.ServerID] = struct{}{}
+ }
+ }
glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
// Create typed parameters with EC destination information and replicas
result.TypedParams = &worker_pb.TaskParams{
+ TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
@@ -143,6 +252,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// planECDestinations plans the destinations for erasure coding operation
// This function implements EC destination planning logic directly in the detection phase
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
+ // Calculate expected shard size for EC operation
+ expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
+
// Get source node information from topology
var sourceRack, sourceDC string
@@ -168,10 +280,12 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
}
}
- // Get available disks for EC placement (include source node for EC)
- availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "")
+ // Get available disks for EC placement with effective capacity consideration (includes pending tasks)
+ // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
+ // For EC, we need at least 1 available volume slot on a disk to consider it for placement.
+ availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1)
if len(availableDisks) < erasure_coding.MinTotalDisks {
- return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", erasure_coding.MinTotalDisks, len(availableDisks))
+ return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
}
// Select best disks for EC placement with rack/DC diversity
@@ -190,7 +304,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
TargetDisk: disk.DiskID,
TargetRack: disk.Rack,
TargetDC: disk.DataCenter,
- ExpectedSize: 0, // EC shards don't have predetermined size
+ ExpectedSize: expectedShardSize, // Set calculated EC shard size
PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
}
@@ -202,6 +316,22 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
dcCount[disk.DataCenter]++
}
+ // Log capacity utilization information using ActiveTopology's encapsulated logic
+ totalEffectiveCapacity := int64(0)
+ for _, plan := range plans {
+ effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk)
+ totalEffectiveCapacity += effectiveCapacity
+ }
+
+ glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots",
+ metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
+
+ // Log storage impact for EC task (source only - EC has multiple targets handled individually)
+ sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
+ glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
+ sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
+ glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")
+
return &topology.MultiDestinationPlan{
Plans: plans,
TotalShards: len(plans),
@@ -354,13 +484,8 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
return false
}
- // Check if disk has capacity
- if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount {
- return false
- }
-
- // Check if disk is not overloaded
- if disk.LoadCount > 10 { // Arbitrary threshold
+ // Check if disk is not overloaded with tasks
+ if disk.LoadCount > topology.MaxTaskLoadForECPlacement {
return false
}
@@ -380,6 +505,24 @@ func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC str
return conflicts
}
+// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
+// Uses O(1) indexed lookup for optimal performance on large clusters.
+func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
+ if activeTopology == nil {
+ return nil
+ }
+ return activeTopology.GetVolumeLocations(volumeID, collection)
+}
+
+// findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts)
+// Uses O(1) indexed lookup for optimal performance on large clusters.
+func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
+ if activeTopology == nil {
+ return nil
+ }
+ return activeTopology.GetECShardLocations(volumeID, collection)
+}
+
// findVolumeReplicas finds all servers that have replicas of the specified volume
func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
if activeTopology == nil {
diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go
index 23f82ad34..0c14bb956 100644
--- a/weed/worker/tasks/vacuum/detection.go
+++ b/weed/worker/tasks/vacuum/detection.go
@@ -1,6 +1,7 @@
package vacuum
import (
+ "fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -31,7 +32,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
priority = types.TaskPriorityHigh
}
+ // Generate task ID for future ActiveTopology integration
+ taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix())
+
result := &types.TaskDetectionResult{
+ TaskID: taskID, // For future ActiveTopology integration
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
@@ -96,6 +101,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum
// Create typed protobuf parameters
return &worker_pb.TaskParams{
+ TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated)
VolumeId: task.VolumeID,
Server: task.Server,
Collection: task.Collection,