diff options
| author | chrislu <chris.lu@gmail.com> | 2025-08-17 21:19:33 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-08-17 21:19:33 -0700 |
| commit | 49b90bc9a3037cb5df4d5cef92fdff981ff4d0f5 (patch) | |
| tree | c6653e295e7209d8ac9b57d5f4465f1bc6387f83 | |
| parent | e0f33846d1655732f4aca9252027f9a47ebf2165 (diff) | |
| download | seaweedfs-49b90bc9a3037cb5df4d5cef92fdff981ff4d0f5.tar.xz seaweedfs-49b90bc9a3037cb5df4d5cef92fdff981ff4d0f5.zip | |
Finds the most complete/reliable generation
| -rw-r--r-- | weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go | 151 | ||||
| -rw-r--r-- | weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go | 487 |
2 files changed, 585 insertions, 53 deletions
diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go index 7ef8a92db..f110cba5c 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go @@ -45,6 +45,7 @@ type VacuumPlan struct { } // DetermineGenerationsFromParams extracts generation information from task parameters +// Now supports multiple generations and finds the most complete one for vacuum func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.TaskParams) (sourceGen, targetGen uint32, err error) { if params == nil { return 0, 0, fmt.Errorf("task parameters cannot be nil") @@ -55,29 +56,27 @@ func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.Tas return 0, 1, nil } - // Use generation from first source (all sources should have same generation) - if params.Sources[0].Generation > 0 { - sourceGen = params.Sources[0].Generation - targetGen = sourceGen + 1 - } else { - // Generation 0 case - sourceGen = 0 - targetGen = 1 + // Group sources by generation and analyze completeness + generationAnalysis, err := logic.AnalyzeGenerationCompleteness(params) + if err != nil { + return 0, 0, fmt.Errorf("failed to analyze generation completeness: %w", err) } - // Validate consistency - all sources should have the same generation - for i, source := range params.Sources { - if source.Generation != sourceGen { - return 0, 0, fmt.Errorf("inconsistent generations in sources: source[0]=%d, source[%d]=%d", - sourceGen, i, source.Generation) - } + // Find the most complete generation that can be used for reconstruction + mostCompleteGen, found := logic.FindMostCompleteGeneration(generationAnalysis) + if !found { + return 0, 0, fmt.Errorf("no generation has sufficient shards for reconstruction") } - return sourceGen, targetGen, nil + // Target generation is max(all generations) + 1 + maxGen := logic.FindMaxGeneration(generationAnalysis) + targetGen = maxGen + 1 + + return mostCompleteGen, targetGen, nil } -// ParseSourceNodes extracts source node information from task parameters -func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[pb.ServerAddress]erasure_coding.ShardBits, error) { +// ParseSourceNodes extracts source node information from task parameters for a specific generation +func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams, targetGeneration uint32) (map[pb.ServerAddress]erasure_coding.ShardBits, error) { if params == nil { return nil, fmt.Errorf("task parameters cannot be nil") } @@ -85,7 +84,7 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[ sourceNodes := make(map[pb.ServerAddress]erasure_coding.ShardBits) for _, source := range params.Sources { - if source.Node == "" { + if source.Node == "" || source.Generation != targetGeneration { continue } @@ -105,7 +104,7 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[ } if len(sourceNodes) == 0 { - return nil, fmt.Errorf("no valid source nodes found: sources=%d", len(params.Sources)) + return nil, fmt.Errorf("no valid source nodes found for generation %d: sources=%d", targetGeneration, len(params.Sources)) } return sourceNodes, nil @@ -113,14 +112,14 @@ func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[ // CreateVacuumPlan creates a comprehensive plan for the EC vacuum operation func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string, params *worker_pb.TaskParams) (*VacuumPlan, error) { - // Extract generations + // Extract generations and analyze completeness sourceGen, targetGen, err := logic.DetermineGenerationsFromParams(params) if err != nil { return nil, fmt.Errorf("failed to determine generations: %w", err) } - // Parse source nodes - sourceNodes, err := logic.ParseSourceNodes(params) + // Parse source nodes from the selected generation + sourceNodes, err := logic.ParseSourceNodes(params, sourceGen) if err != nil { return nil, fmt.Errorf("failed to parse source nodes: %w", err) } @@ -137,8 +136,18 @@ func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string, Nodes: sourceNodes, // Same nodes, new generation } - // Determine what to cleanup (simplified: just source generation) - generationsToCleanup := []uint32{sourceGen} + // Get all available generations for cleanup calculation + generationAnalysis, err := logic.AnalyzeGenerationCompleteness(params) + if err != nil { + return nil, fmt.Errorf("failed to analyze generations for cleanup: %w", err) + } + + // All generations except target should be cleaned up + var allGenerations []uint32 + for generation := range generationAnalysis { + allGenerations = append(allGenerations, generation) + } + generationsToCleanup := logic.CalculateCleanupGenerations(sourceGen, targetGen, allGenerations) // Generate safety checks safetyChecks := logic.generateSafetyChecks(sourceDistribution, targetGen) @@ -243,6 +252,100 @@ type CleanupImpact struct { ShardsToDelete int } +// GenerationAnalysis represents the analysis of shard completeness per generation +type GenerationAnalysis struct { + Generation uint32 + ShardBits erasure_coding.ShardBits + ShardCount int + Nodes map[pb.ServerAddress]erasure_coding.ShardBits + CanReconstruct bool // Whether this generation has enough shards for reconstruction +} + +// AnalyzeGenerationCompleteness analyzes each generation's shard completeness +func (logic *EcVacuumLogic) AnalyzeGenerationCompleteness(params *worker_pb.TaskParams) (map[uint32]*GenerationAnalysis, error) { + if params == nil { + return nil, fmt.Errorf("task parameters cannot be nil") + } + + generationMap := make(map[uint32]*GenerationAnalysis) + + // Group sources by generation + for _, source := range params.Sources { + if source.Node == "" { + continue + } + + generation := source.Generation + if _, exists := generationMap[generation]; !exists { + generationMap[generation] = &GenerationAnalysis{ + Generation: generation, + ShardBits: erasure_coding.ShardBits(0), + Nodes: make(map[pb.ServerAddress]erasure_coding.ShardBits), + } + } + + analysis := generationMap[generation] + serverAddr := pb.ServerAddress(source.Node) + var shardBits erasure_coding.ShardBits + + // Convert shard IDs to ShardBits + for _, shardId := range source.ShardIds { + if shardId < erasure_coding.TotalShardsCount { + shardBits = shardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + } + + if shardBits.ShardIdCount() > 0 { + analysis.Nodes[serverAddr] = shardBits + analysis.ShardBits = analysis.ShardBits.Plus(shardBits) + } + } + + // Calculate completeness for each generation + for _, analysis := range generationMap { + analysis.ShardCount = analysis.ShardBits.ShardIdCount() + analysis.CanReconstruct = analysis.ShardCount >= erasure_coding.DataShardsCount + } + + return generationMap, nil +} + +// FindMostCompleteGeneration finds the generation with the most complete set of shards +// that can be used for reconstruction +func (logic *EcVacuumLogic) FindMostCompleteGeneration(generationMap map[uint32]*GenerationAnalysis) (uint32, bool) { + var bestGeneration uint32 + var bestShardCount int + found := false + + for generation, analysis := range generationMap { + // Only consider generations that can reconstruct + if !analysis.CanReconstruct { + continue + } + + // Prefer the generation with the most shards, or if tied, the highest generation number + if !found || analysis.ShardCount > bestShardCount || + (analysis.ShardCount == bestShardCount && generation > bestGeneration) { + bestGeneration = generation + bestShardCount = analysis.ShardCount + found = true + } + } + + return bestGeneration, found +} + +// FindMaxGeneration finds the highest generation number among all available generations +func (logic *EcVacuumLogic) FindMaxGeneration(generationMap map[uint32]*GenerationAnalysis) uint32 { + var maxGen uint32 + for generation := range generationMap { + if generation > maxGen { + maxGen = generation + } + } + return maxGen +} + // countShardsToDelete counts how many shard files will be deleted func (logic *EcVacuumLogic) countShardsToDelete(plan *VacuumPlan) int { totalShards := 0 diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go index e4bc242fa..60de16b05 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go @@ -35,7 +35,11 @@ func TestDetermineGenerationsFromParams(t *testing.T) { name: "generation 0 source", params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ - {Generation: 0}, + { + Node: "node1:8080", + Generation: 0, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient + }, }, }, expectSrc: 0, @@ -45,7 +49,11 @@ func TestDetermineGenerationsFromParams(t *testing.T) { name: "generation 1 source", params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ - {Generation: 1}, + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient + }, }, }, expectSrc: 1, @@ -55,29 +63,54 @@ func TestDetermineGenerationsFromParams(t *testing.T) { name: "generation 5 source", params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ - {Generation: 5}, + { + Node: "node1:8080", + Generation: 5, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient + }, }, }, expectSrc: 5, expectTgt: 6, }, { - name: "inconsistent generations", + name: "multiple generations - finds most complete", params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ - {Generation: 1}, - {Generation: 2}, // Different generation! + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2}, // Only 3 shards - insufficient + }, + { + Node: "node2:8080", + Generation: 2, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient + }, }, }, - expectError: true, + expectSrc: 2, // Should pick generation 2 (most complete) + expectTgt: 3, // Target should be max(1,2) + 1 = 3 }, { name: "multiple sources same generation", params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ - {Generation: 3}, - {Generation: 3}, - {Generation: 3}, + { + Node: "node1:8080", + Generation: 3, + ShardIds: []uint32{0, 1, 2, 3, 4}, + }, + { + Node: "node2:8080", + Generation: 3, + ShardIds: []uint32{5, 6, 7, 8, 9}, // Combined = 10 shards - sufficient + }, + { + Node: "node3:8080", + Generation: 3, + ShardIds: []uint32{10, 11, 12, 13}, + }, }, }, expectSrc: 3, @@ -118,6 +151,7 @@ func TestParseSourceNodes(t *testing.T) { tests := []struct { name string params *worker_pb.TaskParams + generation uint32 expectNodes int expectShards map[string][]int // node -> shard IDs expectError bool @@ -125,6 +159,7 @@ func TestParseSourceNodes(t *testing.T) { { name: "nil params", params: nil, + generation: 0, expectError: true, }, { @@ -132,6 +167,7 @@ func TestParseSourceNodes(t *testing.T) { params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{}, }, + generation: 0, expectError: true, }, { @@ -139,11 +175,13 @@ func TestParseSourceNodes(t *testing.T) { params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ { - Node: "node1:8080", - ShardIds: []uint32{0, 1, 2, 3, 4, 5}, + Node: "node1:8080", + Generation: 0, + ShardIds: []uint32{0, 1, 2, 3, 4, 5}, }, }, }, + generation: 0, expectNodes: 1, expectShards: map[string][]int{ "node1:8080": {0, 1, 2, 3, 4, 5}, @@ -154,19 +192,23 @@ func TestParseSourceNodes(t *testing.T) { params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ { - Node: "node1:8080", - ShardIds: []uint32{0, 1, 2, 3, 4}, + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2, 3, 4}, }, { - Node: "node2:8080", - ShardIds: []uint32{5, 6, 7, 8, 9}, + Node: "node2:8080", + Generation: 1, + ShardIds: []uint32{5, 6, 7, 8, 9}, }, { - Node: "node3:8080", - ShardIds: []uint32{10, 11, 12, 13}, + Node: "node3:8080", + Generation: 1, + ShardIds: []uint32{10, 11, 12, 13}, }, }, }, + generation: 1, expectNodes: 3, expectShards: map[string][]int{ "node1:8080": {0, 1, 2, 3, 4}, @@ -179,15 +221,18 @@ func TestParseSourceNodes(t *testing.T) { params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ { - Node: "node1:8080", - ShardIds: []uint32{0, 1, 2}, + Node: "node1:8080", + Generation: 2, + ShardIds: []uint32{0, 1, 2}, }, { - Node: "node2:8080", - ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes + Node: "node2:8080", + Generation: 2, + ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes }, }, }, + generation: 2, expectNodes: 2, expectShards: map[string][]int{ "node1:8080": {0, 1, 2}, @@ -199,15 +244,18 @@ func TestParseSourceNodes(t *testing.T) { params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ { - Node: "", - ShardIds: []uint32{0, 1, 2}, + Node: "", + Generation: 3, + ShardIds: []uint32{0, 1, 2}, }, { - Node: "node1:8080", - ShardIds: []uint32{3, 4, 5}, + Node: "node1:8080", + Generation: 3, + ShardIds: []uint32{3, 4, 5}, }, }, }, + generation: 3, expectNodes: 1, expectShards: map[string][]int{ "node1:8080": {3, 4, 5}, @@ -218,21 +266,51 @@ func TestParseSourceNodes(t *testing.T) { params: &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ { - Node: "node1:8080", - ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid + Node: "node1:8080", + Generation: 4, + ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid }, }, }, + generation: 4, expectNodes: 1, expectShards: map[string][]int{ "node1:8080": {0, 1}, // Only valid shards }, }, + { + name: "filter by generation - only matching generation", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2}, + }, + { + Node: "node2:8080", + Generation: 2, // Different generation - should be ignored + ShardIds: []uint32{3, 4, 5}, + }, + { + Node: "node3:8080", + Generation: 1, // Same generation - should be included + ShardIds: []uint32{6, 7, 8}, + }, + }, + }, + generation: 1, + expectNodes: 2, + expectShards: map[string][]int{ + "node1:8080": {0, 1, 2}, + "node3:8080": {6, 7, 8}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sourceNodes, err := logic.ParseSourceNodes(tt.params) + sourceNodes, err := logic.ParseSourceNodes(tt.params, tt.generation) if tt.expectError { if err == nil { @@ -685,3 +763,354 @@ func createRealisticTopologyTest(t *testing.T) { func TestRealisticTopologyScenarios(t *testing.T) { t.Run("3-node distributed shards", createRealisticTopologyTest) } + +func TestAnalyzeGenerationCompleteness(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + params *worker_pb.TaskParams + expectedGenerations []uint32 + expectedCanReconstruct map[uint32]bool + expectError bool + }{ + { + name: "nil params", + params: nil, + expectError: true, + }, + { + name: "single generation sufficient shards", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards = sufficient + }, + }, + }, + expectedGenerations: []uint32{1}, + expectedCanReconstruct: map[uint32]bool{1: true}, + }, + { + name: "single generation insufficient shards", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2}, // Only 3 shards = insufficient + }, + }, + }, + expectedGenerations: []uint32{1}, + expectedCanReconstruct: map[uint32]bool{1: false}, + }, + { + name: "multiple generations mixed completeness", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2}, // 3 shards - insufficient + }, + { + Node: "node2:8080", + Generation: 2, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // 10 shards - sufficient + }, + { + Node: "node3:8080", + Generation: 3, + ShardIds: []uint32{0, 1, 2, 3, 4, 5}, // 6 shards - insufficient + }, + }, + }, + expectedGenerations: []uint32{1, 2, 3}, + expectedCanReconstruct: map[uint32]bool{1: false, 2: true, 3: false}, + }, + { + name: "multiple nodes same generation", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2, 3, 4}, + }, + { + Node: "node2:8080", + Generation: 1, + ShardIds: []uint32{5, 6, 7, 8, 9}, // Together = 10 shards = sufficient + }, + }, + }, + expectedGenerations: []uint32{1}, + expectedCanReconstruct: map[uint32]bool{1: true}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + analysis, err := logic.AnalyzeGenerationCompleteness(tt.params) + + if tt.expectError { + if err == nil { + t.Errorf("expected error but got none") + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + // Check we have the expected generations + if len(analysis) != len(tt.expectedGenerations) { + t.Errorf("generation count: expected %d, got %d", len(tt.expectedGenerations), len(analysis)) + return + } + + for _, expectedGen := range tt.expectedGenerations { + genAnalysis, exists := analysis[expectedGen] + if !exists { + t.Errorf("expected generation %d not found", expectedGen) + continue + } + + expectedCanReconstruct := tt.expectedCanReconstruct[expectedGen] + if genAnalysis.CanReconstruct != expectedCanReconstruct { + t.Errorf("generation %d CanReconstruct: expected %v, got %v", + expectedGen, expectedCanReconstruct, genAnalysis.CanReconstruct) + } + } + }) + } +} + +func TestFindMostCompleteGeneration(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + generationAnalysis map[uint32]*GenerationAnalysis + expectedGeneration uint32 + expectedFound bool + }{ + { + name: "empty analysis", + generationAnalysis: map[uint32]*GenerationAnalysis{}, + expectedFound: false, + }, + { + name: "single reconstructable generation", + generationAnalysis: map[uint32]*GenerationAnalysis{ + 1: {Generation: 1, ShardCount: 10, CanReconstruct: true}, + }, + expectedGeneration: 1, + expectedFound: true, + }, + { + name: "no reconstructable generations", + generationAnalysis: map[uint32]*GenerationAnalysis{ + 1: {Generation: 1, ShardCount: 5, CanReconstruct: false}, + 2: {Generation: 2, ShardCount: 3, CanReconstruct: false}, + }, + expectedFound: false, + }, + { + name: "multiple reconstructable - picks most complete", + generationAnalysis: map[uint32]*GenerationAnalysis{ + 1: {Generation: 1, ShardCount: 10, CanReconstruct: true}, + 2: {Generation: 2, ShardCount: 14, CanReconstruct: true}, // Most complete + 3: {Generation: 3, ShardCount: 12, CanReconstruct: true}, + }, + expectedGeneration: 2, + expectedFound: true, + }, + { + name: "tie in shard count - picks higher generation", + generationAnalysis: map[uint32]*GenerationAnalysis{ + 1: {Generation: 1, ShardCount: 10, CanReconstruct: true}, + 2: {Generation: 2, ShardCount: 10, CanReconstruct: true}, // Same count, higher generation + }, + expectedGeneration: 2, + expectedFound: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + generation, found := logic.FindMostCompleteGeneration(tt.generationAnalysis) + + if found != tt.expectedFound { + t.Errorf("found: expected %v, got %v", tt.expectedFound, found) + return + } + + if tt.expectedFound && generation != tt.expectedGeneration { + t.Errorf("generation: expected %d, got %d", tt.expectedGeneration, generation) + } + }) + } +} + +func TestFindMaxGeneration(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + generationAnalysis map[uint32]*GenerationAnalysis + expectedMax uint32 + }{ + { + name: "empty analysis", + generationAnalysis: map[uint32]*GenerationAnalysis{}, + expectedMax: 0, + }, + { + name: "single generation", + generationAnalysis: map[uint32]*GenerationAnalysis{ + 5: {Generation: 5}, + }, + expectedMax: 5, + }, + { + name: "multiple generations", + generationAnalysis: map[uint32]*GenerationAnalysis{ + 1: {Generation: 1}, + 5: {Generation: 5}, + 3: {Generation: 3}, + 7: {Generation: 7}, // Highest + }, + expectedMax: 7, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + maxGen := logic.FindMaxGeneration(tt.generationAnalysis) + + if maxGen != tt.expectedMax { + t.Errorf("max generation: expected %d, got %d", tt.expectedMax, maxGen) + } + }) + } +} + +func TestMultiGenerationVacuumScenarios(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + params *worker_pb.TaskParams + expectedSourceGen uint32 + expectedTargetGen uint32 + expectedCleanupCount int + expectError bool + }{ + { + name: "corrupted generation 1, good generation 2", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2}, // Insufficient - corrupted data + }, + { + Node: "node2:8080", + Generation: 2, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, // Complete - good data + }, + }, + }, + expectedSourceGen: 2, // Should use generation 2 + expectedTargetGen: 3, // max(1,2) + 1 = 3 + expectedCleanupCount: 2, // Clean up generations 1 and 2 + }, + { + name: "multiple old generations, one current good", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 0, + ShardIds: []uint32{0, 1}, // Old incomplete + }, + { + Node: "node2:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2, 3}, // Old incomplete + }, + { + Node: "node3:8080", + Generation: 2, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, // Complete - all shards + }, + }, + }, + expectedSourceGen: 2, // Should use generation 2 (most complete) + expectedTargetGen: 3, // max(0,1,2) + 1 = 3 + expectedCleanupCount: 3, // Clean up generations 0, 1, and 2 + }, + { + name: "no sufficient generations", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2}, // Only 3 shards - insufficient + }, + { + Node: "node2:8080", + Generation: 2, + ShardIds: []uint32{0, 1}, // Only 2 shards - insufficient + }, + }, + }, + expectError: true, // No generation has enough shards + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plan, err := logic.CreateVacuumPlan(123, "test", tt.params) + + if tt.expectError { + if err == nil { + t.Errorf("expected error but got none") + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + if plan.CurrentGeneration != tt.expectedSourceGen { + t.Errorf("source generation: expected %d, got %d", tt.expectedSourceGen, plan.CurrentGeneration) + } + + if plan.TargetGeneration != tt.expectedTargetGen { + t.Errorf("target generation: expected %d, got %d", tt.expectedTargetGen, plan.TargetGeneration) + } + + if len(plan.GenerationsToCleanup) != tt.expectedCleanupCount { + t.Errorf("cleanup count: expected %d, got %d", tt.expectedCleanupCount, len(plan.GenerationsToCleanup)) + } + + // Verify cleanup generations don't include target + for _, gen := range plan.GenerationsToCleanup { + if gen == plan.TargetGeneration { + t.Errorf("cleanup generations should not include target generation %d", plan.TargetGeneration) + } + } + }) + } +} |
