aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-08-17 21:19:33 -0700
committerchrislu <chris.lu@gmail.com>2025-08-17 21:19:33 -0700
commit49b90bc9a3037cb5df4d5cef92fdff981ff4d0f5 (patch)
treec6653e295e7209d8ac9b57d5f4465f1bc6387f83
parente0f33846d1655732f4aca9252027f9a47ebf2165 (diff)
downloadseaweedfs-49b90bc9a3037cb5df4d5cef92fdff981ff4d0f5.tar.xz
seaweedfs-49b90bc9a3037cb5df4d5cef92fdff981ff4d0f5.zip
Finds the most complete/reliable generation
-rw-r--r--weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go151
-rw-r--r--weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go487
2 files changed, 585 insertions, 53 deletions
diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go
index 7ef8a92db..f110cba5c 100644
--- a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go
+++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go
@@ -45,6 +45,7 @@ type VacuumPlan struct {
}
// DetermineGenerationsFromParams extracts generation information from task parameters
+// Now supports multiple generations and finds the most complete one for vacuum
func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.TaskParams) (sourceGen, targetGen uint32, err error) {
if params == nil {
return 0, 0, fmt.Errorf("task parameters cannot be nil")
@@ -55,29 +56,27 @@ func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.Tas
return 0, 1, nil
}
- // Use generation from first source (all sources should have same generation)
- if params.Sources[0].Generation > 0 {
- sourceGen = params.Sources[0].Generation
- targetGen = sourceGen + 1
- } else {
- // Generation 0 case
- sourceGen = 0
- targetGen = 1
+ // Group sources by generation and analyze completeness
+ generationAnalysis, err := logic.AnalyzeGenerationCompleteness(params)
+ if err != nil {
+ return 0, 0, fmt.Errorf("failed to analyze generation completeness: %w", err)
}
- // Validate consistency - all sources should have the same generation
- for i, source := range params.Sources {
- if source.Generation != sourceGen {
- return 0, 0, fmt.Errorf("inconsistent generations in sources: source[0]=%d, source[%d]=%d",
- sourceGen, i, source.Generation)
- }
+ // Find the most complete generation that can be used for reconstruction
+ mostCompleteGen, found := logic.FindMostCompleteGeneration(generationAnalysis)
+ if !found {
+ return 0, 0, fmt.Errorf("no generation has sufficient shards for reconstruction")
}
- return sourceGen, targetGen, nil
+ // Target generation is max(all generations) + 1
+ maxGen := logic.FindMaxGeneration(generationAnalysis)
+ targetGen = maxGen + 1
+
+ return mostCompleteGen, targetGen, nil
}
-// ParseSourceNodes extracts source node information from task parameters
-func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[pb.ServerAddress]erasure_coding.ShardBits, error) {
+// ParseSourceNodes extracts source node information from task parameters for a specific generation
+func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams, targetGeneration uint32) (map[pb.ServerAddress]erasure_coding.ShardBits, error) {
if params == nil {
return nil, fmt.Errorf("task parameters cannot be nil")
}
@@ -85,7 +84,7 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[
sourceNodes := make(map[pb.ServerAddress]erasure_coding.ShardBits)
for _, source := range params.Sources {
- if source.Node == "" {
+ if source.Node == "" || source.Generation != targetGeneration {
continue
}
@@ -105,7 +104,7 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[
}
if len(sourceNodes) == 0 {
- return nil, fmt.Errorf("no valid source nodes found: sources=%d", len(params.Sources))
+ return nil, fmt.Errorf("no valid source nodes found for generation %d: sources=%d", targetGeneration, len(params.Sources))
}
return sourceNodes, nil
@@ -113,14 +112,14 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[
// CreateVacuumPlan creates a comprehensive plan for the EC vacuum operation
func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string, params *worker_pb.TaskParams) (*VacuumPlan, error) {
- // Extract generations
+ // Extract generations and analyze completeness
sourceGen, targetGen, err := logic.DetermineGenerationsFromParams(params)
if err != nil {
return nil, fmt.Errorf("failed to determine generations: %w", err)
}
- // Parse source nodes
- sourceNodes, err := logic.ParseSourceNodes(params)
+ // Parse source nodes from the selected generation
+ sourceNodes, err := logic.ParseSourceNodes(params, sourceGen)
if err != nil {
return nil, fmt.Errorf("failed to parse source nodes: %w", err)
}
@@ -137,8 +136,18 @@ func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string,
Nodes: sourceNodes, // Same nodes, new generation
}
- // Determine what to cleanup (simplified: just source generation)
- generationsToCleanup := []uint32{sourceGen}
+ // Get all available generations for cleanup calculation
+ generationAnalysis, err := logic.AnalyzeGenerationCompleteness(params)
+ if err != nil {
+ return nil, fmt.Errorf("failed to analyze generations for cleanup: %w", err)
+ }
+
+ // All generations except target should be cleaned up
+ var allGenerations []uint32
+ for generation := range generationAnalysis {
+ allGenerations = append(allGenerations, generation)
+ }
+ generationsToCleanup := logic.CalculateCleanupGenerations(sourceGen, targetGen, allGenerations)
// Generate safety checks
safetyChecks := logic.generateSafetyChecks(sourceDistribution, targetGen)
@@ -243,6 +252,100 @@ type CleanupImpact struct {
ShardsToDelete int
}
+// GenerationAnalysis represents the analysis of shard completeness per generation
+type GenerationAnalysis struct {
+ Generation uint32
+ ShardBits erasure_coding.ShardBits
+ ShardCount int
+ Nodes map[pb.ServerAddress]erasure_coding.ShardBits
+ CanReconstruct bool // Whether this generation has enough shards for reconstruction
+}
+
+// AnalyzeGenerationCompleteness analyzes each generation's shard completeness
+func (logic *EcVacuumLogic) AnalyzeGenerationCompleteness(params *worker_pb.TaskParams) (map[uint32]*GenerationAnalysis, error) {
+ if params == nil {
+ return nil, fmt.Errorf("task parameters cannot be nil")
+ }
+
+ generationMap := make(map[uint32]*GenerationAnalysis)
+
+ // Group sources by generation
+ for _, source := range params.Sources {
+ if source.Node == "" {
+ continue
+ }
+
+ generation := source.Generation
+ if _, exists := generationMap[generation]; !exists {
+ generationMap[generation] = &GenerationAnalysis{
+ Generation: generation,
+ ShardBits: erasure_coding.ShardBits(0),
+ Nodes: make(map[pb.ServerAddress]erasure_coding.ShardBits),
+ }
+ }
+
+ analysis := generationMap[generation]
+ serverAddr := pb.ServerAddress(source.Node)
+ var shardBits erasure_coding.ShardBits
+
+ // Convert shard IDs to ShardBits
+ for _, shardId := range source.ShardIds {
+ if shardId < erasure_coding.TotalShardsCount {
+ shardBits = shardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ }
+
+ if shardBits.ShardIdCount() > 0 {
+ analysis.Nodes[serverAddr] = shardBits
+ analysis.ShardBits = analysis.ShardBits.Plus(shardBits)
+ }
+ }
+
+ // Calculate completeness for each generation
+ for _, analysis := range generationMap {
+ analysis.ShardCount = analysis.ShardBits.ShardIdCount()
+ analysis.CanReconstruct = analysis.ShardCount >= erasure_coding.DataShardsCount
+ }
+
+ return generationMap, nil
+}
+
+// FindMostCompleteGeneration finds the generation with the most complete set of shards
+// that can be used for reconstruction
+func (logic *EcVacuumLogic) FindMostCompleteGeneration(generationMap map[uint32]*GenerationAnalysis) (uint32, bool) {
+ var bestGeneration uint32
+ var bestShardCount int
+ found := false
+
+ for generation, analysis := range generationMap {
+ // Only consider generations that can reconstruct
+ if !analysis.CanReconstruct {
+ continue
+ }
+
+ // Prefer the generation with the most shards, or if tied, the highest generation number
+ if !found || analysis.ShardCount > bestShardCount ||
+ (analysis.ShardCount == bestShardCount && generation > bestGeneration) {
+ bestGeneration = generation
+ bestShardCount = analysis.ShardCount
+ found = true
+ }
+ }
+
+ return bestGeneration, found
+}
+
+// FindMaxGeneration finds the highest generation number among all available generations
+func (logic *EcVacuumLogic) FindMaxGeneration(generationMap map[uint32]*GenerationAnalysis) uint32 {
+ var maxGen uint32
+ for generation := range generationMap {
+ if generation > maxGen {
+ maxGen = generation
+ }
+ }
+ return maxGen
+}
+
// countShardsToDelete counts how many shard files will be deleted
func (logic *EcVacuumLogic) countShardsToDelete(plan *VacuumPlan) int {
totalShards := 0
diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go
index e4bc242fa..60de16b05 100644
--- a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go
+++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go
@@ -35,7 +35,11 @@ func TestDetermineGenerationsFromParams(t *testing.T) {
name: "generation 0 source",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
- {Generation: 0},
+ {
+ Node: "node1:8080",
+ Generation: 0,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
+ },
},
},
expectSrc: 0,
@@ -45,7 +49,11 @@ func TestDetermineGenerationsFromParams(t *testing.T) {
name: "generation 1 source",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
- {Generation: 1},
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
+ },
},
},
expectSrc: 1,
@@ -55,29 +63,54 @@ func TestDetermineGenerationsFromParams(t *testing.T) {
name: "generation 5 source",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
- {Generation: 5},
+ {
+ Node: "node1:8080",
+ Generation: 5,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
+ },
},
},
expectSrc: 5,
expectTgt: 6,
},
{
- name: "inconsistent generations",
+ name: "multiple generations - finds most complete",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
- {Generation: 1},
- {Generation: 2}, // Different generation!
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2}, // Only 3 shards - insufficient
+ },
+ {
+ Node: "node2:8080",
+ Generation: 2,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
+ },
},
},
- expectError: true,
+ expectSrc: 2, // Should pick generation 2 (most complete)
+ expectTgt: 3, // Target should be max(1,2) + 1 = 3
},
{
name: "multiple sources same generation",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
- {Generation: 3},
- {Generation: 3},
- {Generation: 3},
+ {
+ Node: "node1:8080",
+ Generation: 3,
+ ShardIds: []uint32{0, 1, 2, 3, 4},
+ },
+ {
+ Node: "node2:8080",
+ Generation: 3,
+ ShardIds: []uint32{5, 6, 7, 8, 9}, // Combined = 10 shards - sufficient
+ },
+ {
+ Node: "node3:8080",
+ Generation: 3,
+ ShardIds: []uint32{10, 11, 12, 13},
+ },
},
},
expectSrc: 3,
@@ -118,6 +151,7 @@ func TestParseSourceNodes(t *testing.T) {
tests := []struct {
name string
params *worker_pb.TaskParams
+ generation uint32
expectNodes int
expectShards map[string][]int // node -> shard IDs
expectError bool
@@ -125,6 +159,7 @@ func TestParseSourceNodes(t *testing.T) {
{
name: "nil params",
params: nil,
+ generation: 0,
expectError: true,
},
{
@@ -132,6 +167,7 @@ func TestParseSourceNodes(t *testing.T) {
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{},
},
+ generation: 0,
expectError: true,
},
{
@@ -139,11 +175,13 @@ func TestParseSourceNodes(t *testing.T) {
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
- Node: "node1:8080",
- ShardIds: []uint32{0, 1, 2, 3, 4, 5},
+ Node: "node1:8080",
+ Generation: 0,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5},
},
},
},
+ generation: 0,
expectNodes: 1,
expectShards: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4, 5},
@@ -154,19 +192,23 @@ func TestParseSourceNodes(t *testing.T) {
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
- Node: "node1:8080",
- ShardIds: []uint32{0, 1, 2, 3, 4},
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2, 3, 4},
},
{
- Node: "node2:8080",
- ShardIds: []uint32{5, 6, 7, 8, 9},
+ Node: "node2:8080",
+ Generation: 1,
+ ShardIds: []uint32{5, 6, 7, 8, 9},
},
{
- Node: "node3:8080",
- ShardIds: []uint32{10, 11, 12, 13},
+ Node: "node3:8080",
+ Generation: 1,
+ ShardIds: []uint32{10, 11, 12, 13},
},
},
},
+ generation: 1,
expectNodes: 3,
expectShards: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
@@ -179,15 +221,18 @@ func TestParseSourceNodes(t *testing.T) {
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
- Node: "node1:8080",
- ShardIds: []uint32{0, 1, 2},
+ Node: "node1:8080",
+ Generation: 2,
+ ShardIds: []uint32{0, 1, 2},
},
{
- Node: "node2:8080",
- ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes
+ Node: "node2:8080",
+ Generation: 2,
+ ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes
},
},
},
+ generation: 2,
expectNodes: 2,
expectShards: map[string][]int{
"node1:8080": {0, 1, 2},
@@ -199,15 +244,18 @@ func TestParseSourceNodes(t *testing.T) {
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
- Node: "",
- ShardIds: []uint32{0, 1, 2},
+ Node: "",
+ Generation: 3,
+ ShardIds: []uint32{0, 1, 2},
},
{
- Node: "node1:8080",
- ShardIds: []uint32{3, 4, 5},
+ Node: "node1:8080",
+ Generation: 3,
+ ShardIds: []uint32{3, 4, 5},
},
},
},
+ generation: 3,
expectNodes: 1,
expectShards: map[string][]int{
"node1:8080": {3, 4, 5},
@@ -218,21 +266,51 @@ func TestParseSourceNodes(t *testing.T) {
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
- Node: "node1:8080",
- ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid
+ Node: "node1:8080",
+ Generation: 4,
+ ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid
},
},
},
+ generation: 4,
expectNodes: 1,
expectShards: map[string][]int{
"node1:8080": {0, 1}, // Only valid shards
},
},
+ {
+ name: "filter by generation - only matching generation",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2},
+ },
+ {
+ Node: "node2:8080",
+ Generation: 2, // Different generation - should be ignored
+ ShardIds: []uint32{3, 4, 5},
+ },
+ {
+ Node: "node3:8080",
+ Generation: 1, // Same generation - should be included
+ ShardIds: []uint32{6, 7, 8},
+ },
+ },
+ },
+ generation: 1,
+ expectNodes: 2,
+ expectShards: map[string][]int{
+ "node1:8080": {0, 1, 2},
+ "node3:8080": {6, 7, 8},
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- sourceNodes, err := logic.ParseSourceNodes(tt.params)
+ sourceNodes, err := logic.ParseSourceNodes(tt.params, tt.generation)
if tt.expectError {
if err == nil {
@@ -685,3 +763,354 @@ func createRealisticTopologyTest(t *testing.T) {
func TestRealisticTopologyScenarios(t *testing.T) {
t.Run("3-node distributed shards", createRealisticTopologyTest)
}
+
+func TestAnalyzeGenerationCompleteness(t *testing.T) {
+ logic := NewEcVacuumLogic()
+
+ tests := []struct {
+ name string
+ params *worker_pb.TaskParams
+ expectedGenerations []uint32
+ expectedCanReconstruct map[uint32]bool
+ expectError bool
+ }{
+ {
+ name: "nil params",
+ params: nil,
+ expectError: true,
+ },
+ {
+ name: "single generation sufficient shards",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards = sufficient
+ },
+ },
+ },
+ expectedGenerations: []uint32{1},
+ expectedCanReconstruct: map[uint32]bool{1: true},
+ },
+ {
+ name: "single generation insufficient shards",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2}, // Only 3 shards = insufficient
+ },
+ },
+ },
+ expectedGenerations: []uint32{1},
+ expectedCanReconstruct: map[uint32]bool{1: false},
+ },
+ {
+ name: "multiple generations mixed completeness",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2}, // 3 shards - insufficient
+ },
+ {
+ Node: "node2:8080",
+ Generation: 2,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient
+ },
+ {
+ Node: "node3:8080",
+ Generation: 3,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5}, // 6 shards - insufficient
+ },
+ },
+ },
+ expectedGenerations: []uint32{1, 2, 3},
+ expectedCanReconstruct: map[uint32]bool{1: false, 2: true, 3: false},
+ },
+ {
+ name: "multiple nodes same generation",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2, 3, 4},
+ },
+ {
+ Node: "node2:8080",
+ Generation: 1,
+ ShardIds: []uint32{5, 6, 7, 8, 9}, // Together = 10 shards = sufficient
+ },
+ },
+ },
+ expectedGenerations: []uint32{1},
+ expectedCanReconstruct: map[uint32]bool{1: true},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ analysis, err := logic.AnalyzeGenerationCompleteness(tt.params)
+
+ if tt.expectError {
+ if err == nil {
+ t.Errorf("expected error but got none")
+ }
+ return
+ }
+
+ if err != nil {
+ t.Errorf("unexpected error: %v", err)
+ return
+ }
+
+ // Check we have the expected generations
+ if len(analysis) != len(tt.expectedGenerations) {
+ t.Errorf("generation count: expected %d, got %d", len(tt.expectedGenerations), len(analysis))
+ return
+ }
+
+ for _, expectedGen := range tt.expectedGenerations {
+ genAnalysis, exists := analysis[expectedGen]
+ if !exists {
+ t.Errorf("expected generation %d not found", expectedGen)
+ continue
+ }
+
+ expectedCanReconstruct := tt.expectedCanReconstruct[expectedGen]
+ if genAnalysis.CanReconstruct != expectedCanReconstruct {
+ t.Errorf("generation %d CanReconstruct: expected %v, got %v",
+ expectedGen, expectedCanReconstruct, genAnalysis.CanReconstruct)
+ }
+ }
+ })
+ }
+}
+
+func TestFindMostCompleteGeneration(t *testing.T) {
+ logic := NewEcVacuumLogic()
+
+ tests := []struct {
+ name string
+ generationAnalysis map[uint32]*GenerationAnalysis
+ expectedGeneration uint32
+ expectedFound bool
+ }{
+ {
+ name: "empty analysis",
+ generationAnalysis: map[uint32]*GenerationAnalysis{},
+ expectedFound: false,
+ },
+ {
+ name: "single reconstructable generation",
+ generationAnalysis: map[uint32]*GenerationAnalysis{
+ 1: {Generation: 1, ShardCount: 10, CanReconstruct: true},
+ },
+ expectedGeneration: 1,
+ expectedFound: true,
+ },
+ {
+ name: "no reconstructable generations",
+ generationAnalysis: map[uint32]*GenerationAnalysis{
+ 1: {Generation: 1, ShardCount: 5, CanReconstruct: false},
+ 2: {Generation: 2, ShardCount: 3, CanReconstruct: false},
+ },
+ expectedFound: false,
+ },
+ {
+ name: "multiple reconstructable - picks most complete",
+ generationAnalysis: map[uint32]*GenerationAnalysis{
+ 1: {Generation: 1, ShardCount: 10, CanReconstruct: true},
+ 2: {Generation: 2, ShardCount: 14, CanReconstruct: true}, // Most complete
+ 3: {Generation: 3, ShardCount: 12, CanReconstruct: true},
+ },
+ expectedGeneration: 2,
+ expectedFound: true,
+ },
+ {
+ name: "tie in shard count - picks higher generation",
+ generationAnalysis: map[uint32]*GenerationAnalysis{
+ 1: {Generation: 1, ShardCount: 10, CanReconstruct: true},
+ 2: {Generation: 2, ShardCount: 10, CanReconstruct: true}, // Same count, higher generation
+ },
+ expectedGeneration: 2,
+ expectedFound: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ generation, found := logic.FindMostCompleteGeneration(tt.generationAnalysis)
+
+ if found != tt.expectedFound {
+ t.Errorf("found: expected %v, got %v", tt.expectedFound, found)
+ return
+ }
+
+ if tt.expectedFound && generation != tt.expectedGeneration {
+ t.Errorf("generation: expected %d, got %d", tt.expectedGeneration, generation)
+ }
+ })
+ }
+}
+
+func TestFindMaxGeneration(t *testing.T) {
+ logic := NewEcVacuumLogic()
+
+ tests := []struct {
+ name string
+ generationAnalysis map[uint32]*GenerationAnalysis
+ expectedMax uint32
+ }{
+ {
+ name: "empty analysis",
+ generationAnalysis: map[uint32]*GenerationAnalysis{},
+ expectedMax: 0,
+ },
+ {
+ name: "single generation",
+ generationAnalysis: map[uint32]*GenerationAnalysis{
+ 5: {Generation: 5},
+ },
+ expectedMax: 5,
+ },
+ {
+ name: "multiple generations",
+ generationAnalysis: map[uint32]*GenerationAnalysis{
+ 1: {Generation: 1},
+ 5: {Generation: 5},
+ 3: {Generation: 3},
+ 7: {Generation: 7}, // Highest
+ },
+ expectedMax: 7,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ maxGen := logic.FindMaxGeneration(tt.generationAnalysis)
+
+ if maxGen != tt.expectedMax {
+ t.Errorf("max generation: expected %d, got %d", tt.expectedMax, maxGen)
+ }
+ })
+ }
+}
+
+func TestMultiGenerationVacuumScenarios(t *testing.T) {
+ logic := NewEcVacuumLogic()
+
+ tests := []struct {
+ name string
+ params *worker_pb.TaskParams
+ expectedSourceGen uint32
+ expectedTargetGen uint32
+ expectedCleanupCount int
+ expectError bool
+ }{
+ {
+ name: "corrupted generation 1, good generation 2",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2}, // Insufficient - corrupted data
+ },
+ {
+ Node: "node2:8080",
+ Generation: 2,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // Complete - good data
+ },
+ },
+ },
+ expectedSourceGen: 2, // Should use generation 2
+ expectedTargetGen: 3, // max(1,2) + 1 = 3
+ expectedCleanupCount: 2, // Clean up generations 1 and 2
+ },
+ {
+ name: "multiple old generations, one current good",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 0,
+ ShardIds: []uint32{0, 1}, // Old incomplete
+ },
+ {
+ Node: "node2:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2, 3}, // Old incomplete
+ },
+ {
+ Node: "node3:8080",
+ Generation: 2,
+ ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, // Complete - all shards
+ },
+ },
+ },
+ expectedSourceGen: 2, // Should use generation 2 (most complete)
+ expectedTargetGen: 3, // max(0,1,2) + 1 = 3
+ expectedCleanupCount: 3, // Clean up generations 0, 1, and 2
+ },
+ {
+ name: "no sufficient generations",
+ params: &worker_pb.TaskParams{
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: "node1:8080",
+ Generation: 1,
+ ShardIds: []uint32{0, 1, 2}, // Only 3 shards - insufficient
+ },
+ {
+ Node: "node2:8080",
+ Generation: 2,
+ ShardIds: []uint32{0, 1}, // Only 2 shards - insufficient
+ },
+ },
+ },
+ expectError: true, // No generation has enough shards
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plan, err := logic.CreateVacuumPlan(123, "test", tt.params)
+
+ if tt.expectError {
+ if err == nil {
+ t.Errorf("expected error but got none")
+ }
+ return
+ }
+
+ if err != nil {
+ t.Errorf("unexpected error: %v", err)
+ return
+ }
+
+ if plan.CurrentGeneration != tt.expectedSourceGen {
+ t.Errorf("source generation: expected %d, got %d", tt.expectedSourceGen, plan.CurrentGeneration)
+ }
+
+ if plan.TargetGeneration != tt.expectedTargetGen {
+ t.Errorf("target generation: expected %d, got %d", tt.expectedTargetGen, plan.TargetGeneration)
+ }
+
+ if len(plan.GenerationsToCleanup) != tt.expectedCleanupCount {
+ t.Errorf("cleanup count: expected %d, got %d", tt.expectedCleanupCount, len(plan.GenerationsToCleanup))
+ }
+
+ // Verify cleanup generations don't include target
+ for _, gen := range plan.GenerationsToCleanup {
+ if gen == plan.TargetGeneration {
+ t.Errorf("cleanup generations should not include target generation %d", plan.TargetGeneration)
+ }
+ }
+ })
+ }
+}